From 59e1ea53a8e3b2d4d132ce1a86e7e752c0262324 Mon Sep 17 00:00:00 2001 From: latte Date: Wed, 25 Feb 2026 16:54:01 +0100 Subject: [PATCH] Add OAuth2/OIDC per-user Gitea authentication Introduce a GiteaOAuthValidator for JWT and userinfo validation and fallbacks, add /oauth/token proxy, and thread per-user tokens through the request context and automation paths. Update config and .env.example for OAuth-first mode, add OpenAPI, extensive unit/integration tests, GitHub/Gitea CI workflows, docs, and lint/test enforcement (>=80% cov). --- .claude/settings.local.json | 8 + .env.example | 41 +-- .gitea/workflows/docker.yml | 74 +++++ .gitea/workflows/lint.yml | 34 +++ .gitea/workflows/test.yml | 33 ++ Makefile | 4 +- README.md | 182 +++++++---- docker/Dockerfile | 4 + docs/api-reference.md | 86 ++++-- docs/configuration.md | 110 +++---- docs/deployment.md | 40 +-- docs/security.md | 59 ++-- openapi-gpt.yaml | 109 +++++++ pyproject.toml | 5 +- requirements.txt | 2 + src/aegis_gitea_mcp/automation.py | 17 +- src/aegis_gitea_mcp/config.py | 65 +++- src/aegis_gitea_mcp/gitea_client.py | 24 +- src/aegis_gitea_mcp/mcp_protocol.py | 2 +- src/aegis_gitea_mcp/oauth.py | 366 ++++++++++++++++++++++ src/aegis_gitea_mcp/request_context.py | 40 +++ src/aegis_gitea_mcp/server.py | 265 ++++++++++++++-- tests/conftest.py | 19 +- tests/test_automation.py | 25 +- tests/test_automation_manager.py | 140 +++++++++ tests/test_gitea_client.py | 168 +++++++++++ tests/test_integration.py | 255 +++++----------- tests/test_oauth.py | 379 +++++++++++++++++++++++ tests/test_oauth_oidc.py | 150 +++++++++ tests/test_repository_tools.py | 128 ++++++++ tests/test_server.py | 401 +++++++++++++------------ 31 files changed, 2575 insertions(+), 660 deletions(-) create mode 100644 .claude/settings.local.json create mode 100644 .gitea/workflows/docker.yml create mode 100644 .gitea/workflows/lint.yml create mode 100644 .gitea/workflows/test.yml create mode 100644 openapi-gpt.yaml create mode 100644 src/aegis_gitea_mcp/oauth.py create mode 100644 tests/test_automation_manager.py create mode 100644 tests/test_gitea_client.py create mode 100644 tests/test_oauth.py create mode 100644 tests/test_oauth_oidc.py create mode 100644 tests/test_repository_tools.py diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..227e67a --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,8 @@ +{ + "permissions": { + "allow": [ + "Bash(python -m pytest:*)", + "Bash(python:*)" + ] + } +} diff --git a/.env.example b/.env.example index d1b81c2..d866ebe 100644 --- a/.env.example +++ b/.env.example @@ -1,20 +1,29 @@ -# Runtime Environment +# Runtime environment ENVIRONMENT=production -# Gitea Configuration -GITEA_URL=https://gitea.example.com -GITEA_TOKEN=your-bot-user-token-here +# Gitea OAuth/OIDC resource server +GITEA_URL=https://git.hiddenden.cafe -# MCP Server Configuration -# Secure default: bind only localhost unless explicitly overridden. +# OAuth mode (recommended and required for per-user repository isolation) +OAUTH_MODE=true +GITEA_OAUTH_CLIENT_ID=your-gitea-oauth-client-id +GITEA_OAUTH_CLIENT_SECRET=your-gitea-oauth-client-secret +# Optional explicit audience override; defaults to GITEA_OAUTH_CLIENT_ID +OAUTH_EXPECTED_AUDIENCE= +# OIDC discovery and JWKS cache TTL +OAUTH_CACHE_TTL_SECONDS=300 + +# MCP server configuration MCP_HOST=127.0.0.1 MCP_PORT=8080 ALLOW_INSECURE_BIND=false -# Authentication Configuration (REQUIRED unless AUTH_ENABLED=false) -AUTH_ENABLED=true -MCP_API_KEYS=your-generated-api-key-here -# MCP_API_KEYS=key1,key2,key3 +# Logging / observability +LOG_LEVEL=INFO +AUDIT_LOG_PATH=/var/log/aegis-mcp/audit.log +METRICS_ENABLED=true +EXPOSE_ERROR_DETAILS=false +STARTUP_VALIDATE_GITEA=true # Authentication failure controls MAX_AUTH_FAILURES=5 @@ -24,13 +33,6 @@ AUTH_FAILURE_WINDOW=300 RATE_LIMIT_PER_MINUTE=60 TOKEN_RATE_LIMIT_PER_MINUTE=120 -# Logging / observability -LOG_LEVEL=INFO -AUDIT_LOG_PATH=/var/log/aegis-mcp/audit.log -METRICS_ENABLED=true -EXPOSE_ERROR_DETAILS=false -STARTUP_VALIDATE_GITEA=true - # Tool output limits MAX_FILE_SIZE_BYTES=1048576 MAX_TOOL_RESPONSE_ITEMS=200 @@ -50,3 +52,8 @@ WRITE_ALLOW_ALL_TOKEN_REPOS=false AUTOMATION_ENABLED=false AUTOMATION_SCHEDULER_ENABLED=false AUTOMATION_STALE_DAYS=30 + +# Legacy compatibility (not used for OAuth-protected MCP tool execution) +# GITEA_TOKEN= +# MCP_API_KEYS= +# AUTH_ENABLED=true diff --git a/.gitea/workflows/docker.yml b/.gitea/workflows/docker.yml new file mode 100644 index 0000000..5ad3c4f --- /dev/null +++ b/.gitea/workflows/docker.yml @@ -0,0 +1,74 @@ +name: docker + +on: + push: + pull_request: + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements-dev.txt + - name: Run lint + run: | + ruff check src tests + ruff format --check src tests + black --check src tests + mypy src + + test: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements-dev.txt + - name: Run tests + run: pytest --cov=aegis_gitea_mcp --cov-report=term-missing --cov-fail-under=80 + + docker-build: + runs-on: ubuntu-latest + needs: [lint, test] + env: + IMAGE_NAME: aegis-gitea-mcp + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Build image tagged with commit SHA + run: | + SHA_TAG="${GITHUB_SHA:-${CI_COMMIT_SHA:-local}}" + docker build -f docker/Dockerfile -t ${IMAGE_NAME}:${SHA_TAG} . + + - name: Tag latest on main + run: | + REF_NAME="${GITHUB_REF_NAME:-${CI_COMMIT_REF_NAME:-}}" + SHA_TAG="${GITHUB_SHA:-${CI_COMMIT_SHA:-local}}" + if [ "${REF_NAME}" = "main" ]; then + docker tag ${IMAGE_NAME}:${SHA_TAG} ${IMAGE_NAME}:latest + fi + + - name: Optional registry push + if: ${{ vars.PUSH_IMAGE == 'true' }} + run: | + SHA_TAG="${GITHUB_SHA:-${CI_COMMIT_SHA:-local}}" + docker push ${IMAGE_NAME}:${SHA_TAG} + REF_NAME="${GITHUB_REF_NAME:-${CI_COMMIT_REF_NAME:-}}" + if [ "${REF_NAME}" = "main" ]; then + docker push ${IMAGE_NAME}:latest + fi diff --git a/.gitea/workflows/lint.yml b/.gitea/workflows/lint.yml new file mode 100644 index 0000000..16bcff0 --- /dev/null +++ b/.gitea/workflows/lint.yml @@ -0,0 +1,34 @@ +name: lint + +on: + push: + pull_request: + +jobs: + lint: + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements-dev.txt + + - name: Run Ruff (fail on any diagnostics) + run: ruff check src tests + + - name: Enforce formatting + run: | + ruff format --check src tests + black --check src tests + + - name: Run mypy + run: mypy src diff --git a/.gitea/workflows/test.yml b/.gitea/workflows/test.yml new file mode 100644 index 0000000..573801f --- /dev/null +++ b/.gitea/workflows/test.yml @@ -0,0 +1,33 @@ +name: test + +on: + push: + pull_request: + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements-dev.txt + + - name: Run lint + run: | + ruff check src tests + ruff format --check src tests + black --check src tests + + - name: Run tests with coverage gate + run: | + pytest --cov=aegis_gitea_mcp --cov-report=term-missing --cov-fail-under=80 diff --git a/Makefile b/Makefile index 86a6fc5..6ee8e7b 100644 --- a/Makefile +++ b/Makefile @@ -35,10 +35,12 @@ install-dev: pre-commit install test: - pytest tests/ -v --cov=aegis_gitea_mcp --cov-report=html --cov-report=term + pytest tests/ -v --cov=aegis_gitea_mcp --cov-report=html --cov-report=term --cov-fail-under=80 lint: ruff check src/ tests/ + ruff format --check src/ tests/ + black --check src/ tests/ mypy src/ format: diff --git a/README.md b/README.md index 12914fb..57a89a8 100644 --- a/README.md +++ b/README.md @@ -1,85 +1,149 @@ # AegisGitea-MCP -Security-first, policy-driven MCP gateway for Gitea. +Security-first MCP server for self-hosted Gitea with per-user OAuth2/OIDC authentication. -AegisGitea-MCP exposes controlled read and optional write capabilities to AI agents through MCP-compatible endpoints, with strict validation, policy enforcement, tamper-evident audit logging, and secure-by-default runtime controls. +AegisGitea-MCP exposes MCP tools over HTTP/SSE and validates each user token against Gitea so tool access follows each user's actual repository permissions. -## Highlights +## Securing MCP with Gitea OAuth -- Security-first defaults (localhost bind, write mode disabled, no stack traces in production errors). -- YAML policy engine with global/per-repository tool allow/deny and optional path restrictions. -- Expanded read tools for repositories, commits, diffs, issues, PRs, labels, tags, and releases. -- Strict write mode (opt-in + policy enforcement, with whitelist by default). -- Tamper-evident audit logging with hash-chain integrity validation. -- Secret detection/sanitization for outbound payloads. -- Structured JSON logging + Prometheus metrics. -- Hardened Docker runtime (non-root, no-new-privileges, capability drop, read-only where practical). +### 1) Create a Gitea OAuth2 application -## Quick Start +1. Open `https://git.hiddenden.cafe/user/settings/applications` (or admin application settings). +2. Create an OAuth2 app. +3. Set redirect URI to the ChatGPT callback URL shown after creating a New App. +4. Save the app and keep: + - `Client ID` + - `Client Secret` -### 1. Install dependencies +Required scopes: +- `read:repository` +- `write:repository` (only needed when using write tools) -```bash -make install-dev -``` - -### 2. Configure environment +### 2) Configure this MCP server ```bash cp .env.example .env ``` -Set at minimum: -- `GITEA_URL` -- `GITEA_TOKEN` -- `MCP_API_KEYS` +Set OAuth-first values: -### 3. Run locally - -```bash -make run +```env +GITEA_URL=https://git.hiddenden.cafe +OAUTH_MODE=true +GITEA_OAUTH_CLIENT_ID= +GITEA_OAUTH_CLIENT_SECRET= +OAUTH_EXPECTED_AUDIENCE= ``` -Server defaults to `127.0.0.1:8080`. +### 3) Configure ChatGPT New App -## Core Commands +In ChatGPT New App: -- `make test`: run pytest with coverage. -- `make lint`: run Ruff + mypy. -- `make format`: run Black + Ruff autofix. -- `make docker-up`: start hardened prod-profile container. -- `make docker-down`: stop containers. -- `make validate-audit`: validate audit hash chain integrity. +- MCP server URL: `https:///mcp/sse` +- Authentication: OAuth +- OAuth client ID: Gitea OAuth app client ID +- OAuth client secret: Gitea OAuth app client secret -## Security Model +After creation, copy the ChatGPT callback URL and add it to the Gitea OAuth app redirect URIs. -- Authentication: API keys (`Authorization: Bearer `). -- Authorization: policy engine (`policy.yaml`) evaluated before tool execution. -- Rate limiting: per-IP and per-token. -- Output controls: bounded response size and optional secret masking/blocking. -- Write controls: `WRITE_MODE=false` by default; when enabled, use whitelist or opt into `WRITE_ALLOW_ALL_TOKEN_REPOS=true`. +### 4) OAuth-protected MCP behavior + +The server publishes protected-resource metadata: + +- `GET /.well-known/oauth-protected-resource` + +Example response: + +```json +{ + "resource": "https://git.hiddenden.cafe", + "authorization_servers": ["https://git.hiddenden.cafe"], + "bearer_methods_supported": ["header"], + "scopes_supported": ["read:repository", "write:repository"], + "resource_documentation": "https://hiddenden.cafe/docs/mcp-gitea" +} +``` + +If a tool call is missing/invalid auth, MCP endpoints return `401` with: + +```http +WWW-Authenticate: Bearer resource_metadata="https:///.well-known/oauth-protected-resource", scope="read:repository" +``` + +## Architecture + +```text +ChatGPT App + -> Authorization Code Flow + -> Gitea OAuth2/OIDC (issuer: https://git.hiddenden.cafe) + -> Access token + -> MCP Server (/mcp/sse, /mcp/tool/call) + -> OIDC discovery + JWKS cache + -> Scope enforcement (read:repository / write:repository) + -> Per-request Gitea API calls with Authorization: Bearer +``` + +## Example curl + +Protected resource metadata: + +```bash +curl -s https:///.well-known/oauth-protected-resource | jq +``` + +Expected 401 challenge when missing token: + +```bash +curl -i https:///mcp/tool/call \ + -H "Content-Type: application/json" \ + -d '{"tool":"list_repositories","arguments":{}}' +``` + +Authenticated tool call: + +```bash +curl -s https:///mcp/tool/call \ + -H "Authorization: Bearer " \ + -H "Content-Type: application/json" \ + -d '{"tool":"list_repositories","arguments":{}}' +``` + +## Threat model + +- Shared bot tokens are dangerous: + - one leaked token can expose all repositories reachable by that bot account. + - blast radius is repository-wide and cross-user. +- Token-in-URL is insecure: + - URLs leak via logs, proxies, browser history, and referers. + - bearer tokens must be sent in `Authorization` headers only. +- Per-user OAuth reduces lateral access: + - each call runs as the signed-in user. + - users only see repositories they already have permission for in Gitea. + +## CI/CD + +Gitea workflows were added under `.gitea/workflows/`: + +- `lint.yml`: Ruff + formatting + mypy. +- `test.yml`: lint + pytest + enforced coverage (`>=80%`). +- `docker.yml`: lint+test gated Docker build, SHA tag, `latest` tag on `main`. + +## Docker hardening + +`docker/Dockerfile` uses a multi-stage build, non-root runtime user, production env flags, minimal runtime dependencies, and a healthcheck. + +## Commands + +- `make test` +- `make lint` +- `make format` +- `make docker-build` +- `make docker-up` ## Documentation -All detailed docs are under `docs/`: - - `docs/api-reference.md` -- `docs/policy.md` - `docs/security.md` -- `docs/audit.md` -- `docs/write-mode.md` +- `docs/configuration.md` - `docs/deployment.md` -- `docs/observability.md` -- `docs/automation.md` -- `docs/governance.md` -- `docs/roadmap.md` -- `docs/todo.md` - -## Conduct and Governance - -- Contributor/maintainer conduct: `CODE_OF_CONDUCT.md` -- AI agent behavioral contract: `AGENTS.md` - -## License - -MIT (see `LICENSE`). +- `docs/write-mode.md` diff --git a/docker/Dockerfile b/docker/Dockerfile index 9ce599d..1fea3eb 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -5,6 +5,7 @@ FROM python:3.12-slim AS builder ENV PYTHONDONTWRITEBYTECODE=1 ENV PYTHONUNBUFFERED=1 +ENV PIP_DISABLE_PIP_VERSION_CHECK=1 WORKDIR /app @@ -20,6 +21,9 @@ FROM python:3.12-slim ENV PYTHONDONTWRITEBYTECODE=1 ENV PYTHONUNBUFFERED=1 +ENV PIP_DISABLE_PIP_VERSION_CHECK=1 +ENV NODE_ENV=production +ENV ENVIRONMENT=production ENV PATH=/home/aegis/.local/bin:$PATH ENV PYTHONPATH=/app/src:$PYTHONPATH diff --git a/docs/api-reference.md b/docs/api-reference.md index caadad9..2cdacd7 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -1,49 +1,69 @@ # API Reference -## Endpoints +## Core Endpoints - `GET /`: server metadata. - `GET /health`: health probe. - `GET /metrics`: Prometheus metrics (when enabled). -- `POST /automation/webhook`: ingest policy-controlled webhook events. -- `POST /automation/jobs/run`: run policy-controlled automation jobs. + +## OAuth Discovery and Token Exchange + +- `GET /.well-known/oauth-protected-resource` + - Returns OAuth protected resource metadata used by MCP clients. +- `GET /.well-known/oauth-authorization-server` + - Returns OAuth authorization server metadata. +- `POST /oauth/token` + - Proxies OAuth authorization-code token exchange to Gitea. + +## MCP Endpoints + - `GET /mcp/tools`: list tool definitions. -- `POST /mcp/tool/call`: execute a tool (`Authorization: Bearer ` required except in explicitly disabled auth mode). +- `POST /mcp/tool/call`: execute a tool. - `GET /mcp/sse` and `POST /mcp/sse`: MCP SSE transport. -## Automation Jobs +Authentication requirements: -`POST /automation/jobs/run` supports: -- `dependency_hygiene_scan` (read-only scaffold). -- `stale_issue_detection` (read-only issue age analysis). -- `auto_issue_creation` (write-mode + whitelist + policy required). +- MCP tool execution requires `Authorization: Bearer `. +- Missing or invalid tokens return `401` with: + - `WWW-Authenticate: Bearer resource_metadata="", scope="read:repository"` + +Scope requirements: + +- Read tools require `read:repository`. +- Write tools require `write:repository`. +- Insufficient scope returns `403`. + +## Automation Endpoints + +- `POST /automation/webhook`: ingest policy-controlled webhook events. +- `POST /automation/jobs/run`: run policy-controlled automation jobs. ## Read Tools -- `list_repositories`. -- `get_repository_info` (`owner`, `repo`). -- `get_file_tree` (`owner`, `repo`, optional `ref`, `recursive`). -- `get_file_contents` (`owner`, `repo`, `filepath`, optional `ref`). -- `search_code` (`owner`, `repo`, `query`, optional `ref`, `page`, `limit`). -- `list_commits` (`owner`, `repo`, optional `ref`, `page`, `limit`). -- `get_commit_diff` (`owner`, `repo`, `sha`). -- `compare_refs` (`owner`, `repo`, `base`, `head`). -- `list_issues` (`owner`, `repo`, optional `state`, `page`, `limit`, `labels`). -- `get_issue` (`owner`, `repo`, `issue_number`). -- `list_pull_requests` (`owner`, `repo`, optional `state`, `page`, `limit`). -- `get_pull_request` (`owner`, `repo`, `pull_number`). -- `list_labels` (`owner`, `repo`, optional `page`, `limit`). -- `list_tags` (`owner`, `repo`, optional `page`, `limit`). -- `list_releases` (`owner`, `repo`, optional `page`, `limit`). +- `list_repositories` +- `get_repository_info` (`owner`, `repo`) +- `get_file_tree` (`owner`, `repo`, optional `ref`, `recursive`) +- `get_file_contents` (`owner`, `repo`, `filepath`, optional `ref`) +- `search_code` (`owner`, `repo`, `query`, optional `ref`, `page`, `limit`) +- `list_commits` (`owner`, `repo`, optional `ref`, `page`, `limit`) +- `get_commit_diff` (`owner`, `repo`, `sha`) +- `compare_refs` (`owner`, `repo`, `base`, `head`) +- `list_issues` (`owner`, `repo`, optional `state`, `page`, `limit`, `labels`) +- `get_issue` (`owner`, `repo`, `issue_number`) +- `list_pull_requests` (`owner`, `repo`, optional `state`, `page`, `limit`) +- `get_pull_request` (`owner`, `repo`, `pull_number`) +- `list_labels` (`owner`, `repo`, optional `page`, `limit`) +- `list_tags` (`owner`, `repo`, optional `page`, `limit`) +- `list_releases` (`owner`, `repo`, optional `page`, `limit`) ## Write Tools (Write Mode Required) -- `create_issue` (`owner`, `repo`, `title`, optional `body`, `labels`, `assignees`). -- `update_issue` (`owner`, `repo`, `issue_number`, one or more of `title`, `body`, `state`). -- `create_issue_comment` (`owner`, `repo`, `issue_number`, `body`). -- `create_pr_comment` (`owner`, `repo`, `pull_number`, `body`). -- `add_labels` (`owner`, `repo`, `issue_number`, `labels`). -- `assign_issue` (`owner`, `repo`, `issue_number`, `assignees`). +- `create_issue` (`owner`, `repo`, `title`, optional `body`, `labels`, `assignees`) +- `update_issue` (`owner`, `repo`, `issue_number`, one or more of `title`, `body`, `state`) +- `create_issue_comment` (`owner`, `repo`, `issue_number`, `body`) +- `create_pr_comment` (`owner`, `repo`, `pull_number`, `body`) +- `add_labels` (`owner`, `repo`, `issue_number`, `labels`) +- `assign_issue` (`owner`, `repo`, `issue_number`, `assignees`) ## Validation and Limits @@ -54,8 +74,8 @@ ## Error Model -- Policy denial: HTTP `403`. -- Validation error: HTTP `400`. - Auth error: HTTP `401`. +- Policy/scope denial: HTTP `403`. +- Validation error: HTTP `400`. - Rate limit: HTTP `429`. -- Internal errors: HTTP `500` without stack traces in production. +- Internal errors: HTTP `500` (no stack traces in production). diff --git a/docs/configuration.md b/docs/configuration.md index b2c337f..0e91104 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1,23 +1,21 @@ # Configuration -All configuration is done through environment variables. Copy `.env.example` to `.env` and set the values before starting the server. +Copy `.env.example` to `.env` and set values before starting: ```bash cp .env.example .env ``` ---- - -## Gitea Settings +## OAuth/OIDC Settings (Primary) | Variable | Required | Default | Description | |---|---|---|---| -| `GITEA_URL` | Yes | — | Base URL of your Gitea instance (e.g. `https://gitea.example.com`) | -| `GITEA_TOKEN` | Yes | — | API token of the Gitea bot user | - -The `GITEA_TOKEN` must be a token belonging to a user that has at least read access to all repositories you want the AI to access. The server validates the token on startup by calling the Gitea `/api/v1/user` endpoint. - ---- +| `GITEA_URL` | Yes | - | Base URL of your Gitea instance | +| `OAUTH_MODE` | No | `false` | Enables OAuth-oriented validation settings | +| `GITEA_OAUTH_CLIENT_ID` | Yes when `OAUTH_MODE=true` | - | OAuth client id | +| `GITEA_OAUTH_CLIENT_SECRET` | Yes when `OAUTH_MODE=true` | - | OAuth client secret | +| `OAUTH_EXPECTED_AUDIENCE` | No | empty | Expected JWT audience; defaults to client id | +| `OAUTH_CACHE_TTL_SECONDS` | No | `300` | OIDC discovery/JWKS cache TTL | ## MCP Server Settings @@ -25,84 +23,44 @@ The `GITEA_TOKEN` must be a token belonging to a user that has at least read acc |---|---|---|---| | `MCP_HOST` | No | `127.0.0.1` | Interface to bind to | | `MCP_PORT` | No | `8080` | Port to listen on | -| `MCP_DOMAIN` | No | — | Public domain name (used for Traefik labels in Docker) | -| `LOG_LEVEL` | No | `INFO` | Log level: `DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL` | -| `STARTUP_VALIDATE_GITEA` | No | `true` | Validate Gitea token and connectivity at startup via `/api/v1/user` | +| `ALLOW_INSECURE_BIND` | No | `false` | Explicit opt-in required for `0.0.0.0` bind | +| `LOG_LEVEL` | No | `INFO` | `DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL` | +| `STARTUP_VALIDATE_GITEA` | No | `true` | Validate OIDC discovery endpoint at startup | -If startup validation fails with `403 Forbidden`, the token is authenticated but lacks permission to access `/api/v1/user`. Grant the bot user token the required API scope/permissions, or temporarily set `STARTUP_VALIDATE_GITEA=false` in controlled troubleshooting environments. - ---- - -## Authentication Settings +## Security and Limits | Variable | Required | Default | Description | |---|---|---|---| -| `AUTH_ENABLED` | No | `true` | Enable or disable API key authentication | -| `MCP_API_KEYS` | Yes (if auth enabled) | — | Comma-separated list of valid API keys | -| `MAX_AUTH_FAILURES` | No | `5` | Number of failed attempts before rate limiting an IP | -| `AUTH_FAILURE_WINDOW` | No | `300` | Time window in seconds for counting failures | +| `MAX_AUTH_FAILURES` | No | `5` | Failed auth attempts before rate limiting | +| `AUTH_FAILURE_WINDOW` | No | `300` | Window in seconds for auth failure counting | +| `RATE_LIMIT_PER_MINUTE` | No | `60` | Per-IP request limit | +| `TOKEN_RATE_LIMIT_PER_MINUTE` | No | `120` | Per-token request limit | +| `MAX_FILE_SIZE_BYTES` | No | `1048576` | Max file payload returned by read tools | +| `MAX_TOOL_RESPONSE_ITEMS` | No | `200` | Max list items in tool responses | +| `MAX_TOOL_RESPONSE_CHARS` | No | `20000` | Max chars in text fields | +| `REQUEST_TIMEOUT_SECONDS` | No | `30` | Upstream timeout for Gitea calls | +| `SECRET_DETECTION_MODE` | No | `mask` | `off`, `mask`, `block` | -### API Key Requirements - -- Minimum length: 32 characters -- Recommended: generate with `make generate-key` (produces 64-character hex keys) -- Multiple keys: separate with commas — useful during key rotation - -```env -# Single key -MCP_API_KEYS=abc123... - -# Multiple keys (grace period during rotation) -MCP_API_KEYS=newkey123...,oldkey456... -``` - -> **Warning:** Setting `AUTH_ENABLED=false` disables all authentication. Only do this in isolated development environments. - ---- - -## File Access Settings +## Write Mode | Variable | Required | Default | Description | |---|---|---|---| -| `MAX_FILE_SIZE_BYTES` | No | `1048576` | Maximum file size the server will return (bytes). Default: 1 MB | -| `REQUEST_TIMEOUT_SECONDS` | No | `30` | Timeout for upstream Gitea API calls (seconds) | +| `WRITE_MODE` | No | `false` | Enables write tools | +| `WRITE_REPOSITORY_WHITELIST` | Required if write mode enabled and allow-all disabled | empty | Comma-separated `owner/repo` allow list | +| `WRITE_ALLOW_ALL_TOKEN_REPOS` | No | `false` | Allow all repos accessible by token | ---- - -## Audit Logging Settings +## Automation | Variable | Required | Default | Description | |---|---|---|---| -| `AUDIT_LOG_PATH` | No | `/var/log/aegis-mcp/audit.log` | Absolute path for the JSON audit log file | +| `AUTOMATION_ENABLED` | No | `false` | Enables automation endpoints | +| `AUTOMATION_SCHEDULER_ENABLED` | No | `false` | Enables scheduler loop | +| `AUTOMATION_STALE_DAYS` | No | `30` | Age threshold for stale issue checks | -The directory is created automatically if it does not exist (requires write permission). +## Legacy Compatibility Variables ---- +These are retained for compatibility but not used for OAuth-protected MCP tool execution: -## Full Example - -```env -# Gitea -GITEA_URL=https://gitea.example.com -GITEA_TOKEN=abcdef1234567890abcdef1234567890 - -# Server -MCP_HOST=127.0.0.1 -MCP_PORT=8080 -MCP_DOMAIN=mcp.example.com -LOG_LEVEL=INFO -STARTUP_VALIDATE_GITEA=true - -# Auth -AUTH_ENABLED=true -MCP_API_KEYS=a1b2c3d4e5f6...64chars -MAX_AUTH_FAILURES=5 -AUTH_FAILURE_WINDOW=300 - -# Limits -MAX_FILE_SIZE_BYTES=1048576 -REQUEST_TIMEOUT_SECONDS=30 - -# Audit -AUDIT_LOG_PATH=/var/log/aegis-mcp/audit.log -``` +- `GITEA_TOKEN` +- `MCP_API_KEYS` +- `AUTH_ENABLED` diff --git a/docs/deployment.md b/docs/deployment.md index 0fac072..60679e5 100644 --- a/docs/deployment.md +++ b/docs/deployment.md @@ -2,26 +2,29 @@ ## Secure Defaults -- Default bind: `MCP_HOST=127.0.0.1`. -- Binding `0.0.0.0` requires explicit `ALLOW_INSECURE_BIND=true`. +- Default bind is `127.0.0.1`. +- Binding `0.0.0.0` requires `ALLOW_INSECURE_BIND=true`. - Write mode disabled by default. -- Policy file path configurable via `POLICY_FILE_PATH`. +- Policy checks run before tool execution. +- OAuth-protected MCP challenge responses are enabled by default for tool calls. ## Local Development ```bash make install-dev cp .env.example .env -make generate-key make run ``` ## Docker -- Use `docker/Dockerfile` (non-root runtime). -- Use compose profiles: - - `prod`: hardened runtime profile. - - `dev`: local development profile (localhost-only port bind). +Use `docker/Dockerfile`: + +- Multi-stage image build. +- Non-root runtime user. +- Production env flags (`NODE_ENV=production`, `ENVIRONMENT=production`). +- Only required app files copied. +- Healthcheck on `/health`. Run examples: @@ -30,17 +33,18 @@ docker compose --profile prod up -d docker compose --profile dev up -d ``` -## Environment Validation +## CI/CD (Gitea Workflows) -Startup validates: -- Required Gitea settings. -- API keys (when auth enabled). -- Insecure bind opt-in. -- Write whitelist when write mode enabled (unless `WRITE_ALLOW_ALL_TOKEN_REPOS=true`). +Workflows live in `.gitea/workflows/`: + +- `lint.yml`: ruff + format checks + mypy. +- `test.yml`: lint + tests + coverage fail-under `80`. +- `docker.yml`: gated Docker build (depends on lint+test), SHA tag, `latest` on `main`. ## Production Recommendations -- Run behind TLS-terminating reverse proxy. -- Restrict network exposure. -- Persist and rotate audit logs. -- Enable external monitoring for `/metrics`. +- Place MCP behind TLS reverse proxy. +- Restrict inbound traffic to expected clients. +- Persist and monitor audit logs. +- Monitor `/metrics` and auth-failure events. +- Rotate OAuth client credentials when required. diff --git a/docs/security.md b/docs/security.md index f11580f..c8e19bf 100644 --- a/docs/security.md +++ b/docs/security.md @@ -2,38 +2,57 @@ ## Core Controls -- API key authentication with constant-time comparison. -- Auth failure throttling. -- Per-IP and per-token request rate limits. -- Strict input validation via Pydantic schemas (`extra=forbid`). -- Policy engine authorization before tool execution. -- Secret detection with mask/block behavior. -- Production-safe error responses (no stack traces). +- OAuth2/OIDC bearer-token authentication for MCP tool execution. +- OIDC discovery + JWKS validation cache for JWT tokens. +- Userinfo validation fallback for opaque OAuth tokens. +- Scope enforcement: + - `read:repository` for read tools. + - `write:repository` for write tools. +- Policy engine checks before tool execution. +- Per-IP and per-token rate limiting. +- Strict schema validation (`extra=forbid`). +- Tamper-evident audit logging with hash chaining. +- Secret sanitization for logs and tool output. +- Production-safe error responses (no internal stack traces). + +## Threat Model + +### Why shared bot tokens are dangerous + +- A single leaked bot token can expose all repositories that bot can access. +- Access is not naturally bounded per end user. +- Blast radius is large and cross-tenant. + +### Why token-in-URL is insecure + +- URLs can be captured by reverse proxy logs, browser history, referer headers, and monitoring pipelines. +- Bearer tokens must be passed in `Authorization` headers only. + +### Why per-user OAuth reduces lateral access + +- Each MCP request executes with the signed-in user token. +- Gitea authorization stays source-of-truth for repository visibility. +- A compromised token is limited to that user’s permissions. ## Prompt Injection Hardening -Repository content is treated strictly as data. +Repository content is treated as untrusted data. - Tool outputs are bounded and sanitized. -- No instruction execution from repository text. -- Untrusted content handling helpers enforce maximum output size. +- No instructions from repository text are executed. +- Text fields are size-limited before returning to LLM clients. ## Secret Detection Detected classes include: -- API keys and generic token patterns. + +- API key and token patterns. - JWT-like tokens. - Private key block markers. -- Common provider token formats. +- Common provider credential formats. Behavior: + - `SECRET_DETECTION_MODE=mask`: redact in place. -- `SECRET_DETECTION_MODE=block`: replace secret-bearing field values. +- `SECRET_DETECTION_MODE=block`: replace secret-bearing values. - `SECRET_DETECTION_MODE=off`: disable sanitization (not recommended). - -## Authentication and Key Lifecycle - -- Keys must be at least 32 characters. -- Rotate keys regularly (`scripts/rotate_api_key.py`). -- Check key age and expiry (`scripts/check_key_age.py`). -- Prefer dedicated bot credentials with least privilege. diff --git a/openapi-gpt.yaml b/openapi-gpt.yaml new file mode 100644 index 0000000..d36f47d --- /dev/null +++ b/openapi-gpt.yaml @@ -0,0 +1,109 @@ +openapi: "3.1.0" +info: + title: AegisGitea MCP + description: > + AI access to your self-hosted Gitea instance via the AegisGitea MCP server. + Each user authenticates with their own Gitea account via OAuth2. + version: "0.2.0" + +servers: + - url: "https://YOUR_MCP_SERVER_DOMAIN" + description: > + Replace YOUR_MCP_SERVER_DOMAIN with the public hostname of your AegisGitea-MCP instance. + +components: + securitySchemes: + gitea_oauth: + type: oauth2 + flows: + authorizationCode: + # Replace YOUR_GITEA_DOMAIN with your self-hosted Gitea instance hostname. + authorizationUrl: "https://YOUR_GITEA_DOMAIN/login/oauth/authorize" + # The token URL must point to the MCP server's OAuth proxy endpoint. + tokenUrl: "https://YOUR_MCP_SERVER_DOMAIN/oauth/token" + scopes: + read: "Read access to Gitea repositories" + +security: + - gitea_oauth: + - read + +paths: + /mcp/tools: + get: + operationId: listTools + summary: List available MCP tools + description: Returns all tools available on this MCP server. Public endpoint, no authentication required. + security: [] + responses: + "200": + description: List of available MCP tools + content: + application/json: + schema: + type: object + properties: + tools: + type: array + items: + type: object + properties: + name: + type: string + description: + type: string + + /mcp/tool/call: + post: + operationId: callTool + summary: Execute an MCP tool + description: > + Execute a named MCP tool with the provided arguments. + The authenticated user's Gitea token is used for all Gitea API calls, + so only repositories and data accessible to the user will be returned. + security: + - gitea_oauth: + - read + requestBody: + required: true + content: + application/json: + schema: + type: object + required: + - tool + - arguments + properties: + tool: + type: string + description: Name of the MCP tool to execute + example: list_repositories + arguments: + type: object + description: Tool-specific arguments + example: {} + correlation_id: + type: string + description: Optional correlation ID for request tracing + responses: + "200": + description: Tool execution result + content: + application/json: + schema: + type: object + properties: + success: + type: boolean + result: + type: object + correlation_id: + type: string + "401": + description: Authentication required or token invalid + "403": + description: Policy denied the request + "404": + description: Tool not found + "429": + description: Rate limit exceeded diff --git a/pyproject.toml b/pyproject.toml index cdfa1de..c90aae6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,8 @@ dependencies = [ "PyYAML>=6.0.1", "python-dotenv>=1.0.0", "structlog>=24.1.0", + "python-multipart>=0.0.9", + "PyJWT[crypto]>=2.9.0", ] [project.optional-dependencies] @@ -104,7 +106,7 @@ disallow_untyped_defs = false [tool.pytest.ini_options] minversion = "7.0" -addopts = "-ra -q --strict-markers --cov=aegis_gitea_mcp --cov-report=term-missing" +addopts = "-ra -q --strict-markers --cov=aegis_gitea_mcp --cov-report=term-missing --cov-fail-under=80" testpaths = ["tests"] pythonpath = ["src"] asyncio_mode = "auto" @@ -114,6 +116,7 @@ source = ["src"] omit = ["tests/*", "**/__pycache__/*"] [tool.coverage.report] +fail_under = 80 exclude_lines = [ "pragma: no cover", "def __repr__", diff --git a/requirements.txt b/requirements.txt index aec577a..a614de2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,6 @@ pydantic>=2.5.0 pydantic-settings>=2.1.0 PyYAML>=6.0.1 python-dotenv>=1.0.0 +python-multipart>=0.0.9 structlog>=24.1.0 +PyJWT[crypto]>=2.9.0 diff --git a/src/aegis_gitea_mcp/automation.py b/src/aegis_gitea_mcp/automation.py index f137dec..91e528f 100644 --- a/src/aegis_gitea_mcp/automation.py +++ b/src/aegis_gitea_mcp/automation.py @@ -79,6 +79,7 @@ class AutomationManager: job_name: str, owner: str, repo: str, + user_token: str | None = None, finding_title: str | None = None, finding_body: str | None = None, ) -> dict[str, Any]: @@ -109,11 +110,12 @@ class AutomationManager: if job_name == "dependency_hygiene_scan": return await self._dependency_hygiene_scan(owner, repo) if job_name == "stale_issue_detection": - return await self._stale_issue_detection(owner, repo) + return await self._stale_issue_detection(owner, repo, user_token=user_token) if job_name == "auto_issue_creation": return await self._auto_issue_creation( owner, repo, + user_token=user_token, finding_title=finding_title, finding_body=finding_body, ) @@ -142,13 +144,17 @@ class AutomationManager: "findings": [], } - async def _stale_issue_detection(self, owner: str, repo: str) -> dict[str, Any]: + async def _stale_issue_detection( + self, owner: str, repo: str, user_token: str | None + ) -> dict[str, Any]: """Detect stale issues using repository issue metadata.""" repository = f"{owner}/{repo}" cutoff = datetime.now(timezone.utc) - timedelta(days=self.settings.automation_stale_days) + if not user_token: + raise AutomationError("missing authenticated user token") stale_issue_numbers: list[int] = [] - async with GiteaClient() as gitea: + async with GiteaClient(token=user_token) as gitea: issues = await gitea.list_issues( owner, repo, @@ -187,6 +193,7 @@ class AutomationManager: self, owner: str, repo: str, + user_token: str | None, finding_title: str | None, finding_body: str | None, ) -> dict[str, Any]: @@ -194,8 +201,10 @@ class AutomationManager: repository = f"{owner}/{repo}" title = finding_title or "Automated security finding" body = finding_body or "Automated finding created by Aegis automation workflow." + if not user_token: + raise AutomationError("missing authenticated user token") - async with GiteaClient() as gitea: + async with GiteaClient(token=user_token) as gitea: issue = await gitea.create_issue( owner, repo, diff --git a/src/aegis_gitea_mcp/config.py b/src/aegis_gitea_mcp/config.py index 3a14d14..aa5aebf 100644 --- a/src/aegis_gitea_mcp/config.py +++ b/src/aegis_gitea_mcp/config.py @@ -31,7 +31,10 @@ class Settings(BaseSettings): # Gitea configuration gitea_url: HttpUrl = Field(..., description="Base URL of the Gitea instance") - gitea_token: str = Field(..., description="Bot user access token for Gitea API", min_length=1) + gitea_token: str = Field( + default="", + description=("Deprecated shared bot token. Not used for MCP tool execution in OAuth mode."), + ) # MCP server configuration mcp_host: str = Field( @@ -96,6 +99,40 @@ class Settings(BaseSettings): description="Secret detection mode: off, mask, or block", ) + # OAuth2 configuration (for ChatGPT per-user Gitea authentication) + oauth_mode: bool = Field( + default=False, + description=( + "Enable per-user OAuth2 authentication mode. " + "When true, each ChatGPT user authenticates with their own Gitea account. " + "GITEA_TOKEN and MCP_API_KEYS are not required in this mode." + ), + ) + gitea_oauth_client_id: str = Field( + default="", + description="Gitea OAuth2 application client ID (required when oauth_mode=true)", + ) + gitea_oauth_client_secret: str = Field( + default="", + description="Gitea OAuth2 application client secret (required when oauth_mode=true)", + ) + oauth_expected_audience: str = Field( + default="", + description=( + "Expected OIDC audience for access tokens. " + "Defaults to GITEA_OAUTH_CLIENT_ID when unset." + ), + ) + oauth_cache_ttl_seconds: int = Field( + default=300, + description="OIDC discovery/JWKS cache TTL in seconds", + ge=30, + ) + oauth_resource_documentation: str = Field( + default="https://hiddenden.cafe/docs/mcp-gitea", + description="Public documentation URL for OAuth-protected MCP resource behavior", + ) + # Authentication configuration auth_enabled: bool = Field( default=True, @@ -170,10 +207,10 @@ class Settings(BaseSettings): @field_validator("gitea_token") @classmethod def validate_token_not_empty(cls, value: str) -> str: - """Validate Gitea token is non-empty and trimmed.""" + """Validate Gitea token is trimmed (empty string allowed for oauth_mode).""" cleaned = value.strip() - if not cleaned: - raise ValueError("gitea_token cannot be empty or whitespace") + if value and not cleaned: + raise ValueError("gitea_token cannot be whitespace-only") return cleaned @field_validator("secret_detection_mode") @@ -217,11 +254,21 @@ class Settings(BaseSettings): "Set ALLOW_INSECURE_BIND=true to explicitly permit this." ) - if self.auth_enabled and not parsed_keys: - raise ValueError( - "At least one API key must be configured when auth_enabled=True. " - "Set MCP_API_KEYS or disable auth explicitly for controlled testing." - ) + if self.oauth_mode: + # In OAuth mode, per-user Gitea tokens are used; no shared bot token or API keys needed. + if not self.gitea_oauth_client_id.strip(): + raise ValueError("GITEA_OAUTH_CLIENT_ID is required when OAUTH_MODE=true.") + if not self.gitea_oauth_client_secret.strip(): + raise ValueError("GITEA_OAUTH_CLIENT_SECRET is required when OAUTH_MODE=true.") + else: + # Standard API key mode: require bot token and at least one API key. + if not self.gitea_token.strip(): + raise ValueError("GITEA_TOKEN is required unless OAUTH_MODE=true.") + if self.auth_enabled and not parsed_keys: + raise ValueError( + "At least one API key must be configured when auth_enabled=True. " + "Set MCP_API_KEYS or disable auth explicitly for controlled testing." + ) # Enforce minimum key length to reduce brute-force success probability. for key in parsed_keys: diff --git a/src/aegis_gitea_mcp/gitea_client.py b/src/aegis_gitea_mcp/gitea_client.py index 686bace..0536047 100644 --- a/src/aegis_gitea_mcp/gitea_client.py +++ b/src/aegis_gitea_mcp/gitea_client.py @@ -19,7 +19,7 @@ class GiteaAuthenticationError(GiteaError): class GiteaAuthorizationError(GiteaError): - """Raised when bot user lacks permission for an operation.""" + """Raised when the authenticated user lacks permission for an operation.""" class GiteaNotFoundError(GiteaError): @@ -27,19 +27,21 @@ class GiteaNotFoundError(GiteaError): class GiteaClient: - """Client for interacting with Gitea API as a bot user.""" + """Client for interacting with Gitea API as the authenticated end-user.""" - def __init__(self, base_url: str | None = None, token: str | None = None) -> None: + def __init__(self, token: str, base_url: str | None = None) -> None: """Initialize Gitea client. Args: + token: OAuth access token for the authenticated user. base_url: Optional base URL override. - token: Optional token override. """ self.settings = get_settings() self.audit = get_audit_logger() self.base_url = (base_url or self.settings.gitea_base_url).rstrip("/") - self.token = token or self.settings.gitea_token + self.token = token.strip() + if not self.token: + raise ValueError("GiteaClient requires a non-empty per-user OAuth token") self.client: AsyncClient | None = None async def __aenter__(self) -> GiteaClient: @@ -47,7 +49,7 @@ class GiteaClient: self.client = AsyncClient( base_url=self.base_url, headers={ - "Authorization": f"token {self.token}", + "Authorization": f"Bearer {self.token}", "Content-Type": "application/json", }, timeout=self.settings.request_timeout_seconds, @@ -79,15 +81,15 @@ class GiteaClient: severity="high", metadata={"correlation_id": correlation_id}, ) - raise GiteaAuthenticationError("Authentication failed - check bot token") + raise GiteaAuthenticationError("Authentication failed - user token rejected") if response.status_code == 403: self.audit.log_access_denied( tool_name="gitea_api", - reason="bot user lacks permission", + reason="authenticated user lacks permission", correlation_id=correlation_id, ) - raise GiteaAuthorizationError("Bot user lacks permission for this operation") + raise GiteaAuthorizationError("Authenticated user lacks permission for this operation") if response.status_code == 404: raise GiteaNotFoundError("Resource not found") @@ -123,7 +125,7 @@ class GiteaClient: return self._handle_response(response, correlation_id) async def get_current_user(self) -> dict[str, Any]: - """Get current bot user profile.""" + """Get current authenticated user profile.""" correlation_id = self.audit.log_tool_invocation( tool_name="get_current_user", result_status="pending", @@ -146,7 +148,7 @@ class GiteaClient: raise async def list_repositories(self) -> list[dict[str, Any]]: - """List all repositories visible to the bot user.""" + """List repositories visible to the authenticated user.""" correlation_id = self.audit.log_tool_invocation( tool_name="list_repositories", result_status="pending", diff --git a/src/aegis_gitea_mcp/mcp_protocol.py b/src/aegis_gitea_mcp/mcp_protocol.py index 6bc5a1e..5fa0b63 100644 --- a/src/aegis_gitea_mcp/mcp_protocol.py +++ b/src/aegis_gitea_mcp/mcp_protocol.py @@ -55,7 +55,7 @@ def _tool( return MCPTool( name=name, description=description, - input_schema=schema, + inputSchema=schema, write_operation=write_operation, ) diff --git a/src/aegis_gitea_mcp/oauth.py b/src/aegis_gitea_mcp/oauth.py new file mode 100644 index 0000000..58b2893 --- /dev/null +++ b/src/aegis_gitea_mcp/oauth.py @@ -0,0 +1,366 @@ +"""OAuth2/OIDC token validation for per-user Gitea authentication.""" + +from __future__ import annotations + +import json +import time +from datetime import datetime, timezone +from typing import Any, cast + +import httpx +import jwt +from jwt import InvalidTokenError +from jwt.algorithms import RSAAlgorithm + +from aegis_gitea_mcp.audit import get_audit_logger +from aegis_gitea_mcp.config import get_settings + + +class OAuthTokenValidationError(RuntimeError): + """Raised when a provided OAuth token cannot be trusted.""" + + def __init__(self, public_message: str, reason: str) -> None: + """Initialize validation error details.""" + super().__init__(public_message) + self.public_message = public_message + self.reason = reason + + +class GiteaOAuthValidator: + """Validate per-user OAuth access tokens issued by Gitea.""" + + def __init__(self) -> None: + """Initialize OAuth validator state and caches.""" + self.settings = get_settings() + self.audit = get_audit_logger() + self._failed_attempts: dict[str, list[datetime]] = {} + self._discovery_cache: tuple[dict[str, Any], float] | None = None + self._jwks_cache: dict[str, tuple[dict[str, Any], float]] = {} + + @staticmethod + def extract_bearer_token(authorization_header: str | None) -> str | None: + """Extract token from `Authorization: Bearer ` header.""" + if not authorization_header: + return None + scheme, separator, token = authorization_header.partition(" ") + if separator != " " or scheme != "Bearer": + return None + stripped = token.strip() + if not stripped or " " in stripped: + return None + return stripped + + def _check_rate_limit(self, identifier: str) -> bool: + """Check whether authentication failures exceed configured threshold.""" + now = datetime.now(timezone.utc) + boundary = now.timestamp() - self.settings.auth_failure_window + + if identifier in self._failed_attempts: + self._failed_attempts[identifier] = [ + attempt + for attempt in self._failed_attempts[identifier] + if attempt.timestamp() > boundary + ] + + return len(self._failed_attempts.get(identifier, [])) < self.settings.max_auth_failures + + def _record_failed_attempt(self, identifier: str) -> None: + """Record a failed authentication attempt for rate limiting.""" + attempt_time = datetime.now(timezone.utc) + self._failed_attempts.setdefault(identifier, []).append(attempt_time) + + if len(self._failed_attempts[identifier]) >= self.settings.max_auth_failures: + self.audit.log_security_event( + event_type="oauth_rate_limit_exceeded", + description="OAuth authentication failure threshold exceeded", + severity="high", + metadata={ + "identifier": identifier, + "failure_count": len(self._failed_attempts[identifier]), + "window_seconds": self.settings.auth_failure_window, + }, + ) + + @staticmethod + def _looks_like_jwt(token: str) -> bool: + """Return True when token has JWT segment structure.""" + return token.count(".") == 2 + + @staticmethod + def _normalize_scopes(raw: Any) -> set[str]: + """Normalize scope claim variations to a set.""" + normalized: set[str] = set() + if isinstance(raw, str): + normalized.update(scope for scope in raw.split(" ") if scope) + elif isinstance(raw, list): + normalized.update(str(scope).strip() for scope in raw if str(scope).strip()) + return normalized + + def _extract_scopes(self, payload: dict[str, Any]) -> set[str]: + """Extract scopes from JWT or userinfo payload.""" + scopes = set() + scopes.update(self._normalize_scopes(payload.get("scope"))) + scopes.update(self._normalize_scopes(payload.get("scopes"))) + scopes.update(self._normalize_scopes(payload.get("scp"))) + return scopes + + async def _fetch_json_document(self, url: str) -> dict[str, Any]: + """Fetch a JSON document from a trusted OAuth endpoint.""" + try: + async with httpx.AsyncClient(timeout=self.settings.request_timeout_seconds) as client: + response = await client.get(url, headers={"Accept": "application/json"}) + except httpx.RequestError as exc: + raise OAuthTokenValidationError( + "Unable to validate OAuth token at this time.", + "oauth_network_error", + ) from exc + + if response.status_code != 200: + raise OAuthTokenValidationError( + "Unable to validate OAuth token at this time.", + "oauth_metadata_unavailable", + ) + + try: + data = response.json() + except ValueError as exc: + raise OAuthTokenValidationError( + "Unable to validate OAuth token at this time.", + "oauth_metadata_invalid_json", + ) from exc + + if not isinstance(data, dict): + raise OAuthTokenValidationError( + "Unable to validate OAuth token at this time.", + "oauth_metadata_invalid_type", + ) + return data + + async def _get_discovery_document(self) -> dict[str, Any]: + """Get cached OIDC discovery metadata.""" + now = time.monotonic() + if self._discovery_cache and now < self._discovery_cache[1]: + return self._discovery_cache[0] + + discovery_url = f"{self.settings.gitea_base_url}/.well-known/openid-configuration" + discovery = await self._fetch_json_document(discovery_url) + issuer = discovery.get("issuer") + jwks_uri = discovery.get("jwks_uri") + if not isinstance(issuer, str) or not issuer.strip(): + raise OAuthTokenValidationError( + "Unable to validate OAuth token at this time.", + "oauth_discovery_missing_issuer", + ) + if not isinstance(jwks_uri, str) or not jwks_uri.strip(): + raise OAuthTokenValidationError( + "Unable to validate OAuth token at this time.", + "oauth_discovery_missing_jwks_uri", + ) + + self._discovery_cache = (discovery, now + self.settings.oauth_cache_ttl_seconds) + return discovery + + async def _get_jwks(self, jwks_uri: str) -> dict[str, Any]: + """Get cached JWKS document.""" + now = time.monotonic() + cached = self._jwks_cache.get(jwks_uri) + if cached and now < cached[1]: + return cached[0] + + jwks = await self._fetch_json_document(jwks_uri) + keys = jwks.get("keys") + if not isinstance(keys, list) or not keys: + raise OAuthTokenValidationError( + "Unable to validate OAuth token at this time.", + "oauth_jwks_missing_keys", + ) + self._jwks_cache[jwks_uri] = (jwks, now + self.settings.oauth_cache_ttl_seconds) + return jwks + + async def _validate_jwt(self, token: str) -> dict[str, Any]: + """Validate JWT access token using OIDC discovery and JWKS.""" + discovery = await self._get_discovery_document() + issuer = str(discovery["issuer"]).rstrip("/") + jwks_uri = str(discovery["jwks_uri"]) + jwks = await self._get_jwks(jwks_uri) + + try: + header = jwt.get_unverified_header(token) + except InvalidTokenError as exc: + raise OAuthTokenValidationError( + "Invalid or expired OAuth token.", "oauth_jwt_header" + ) from exc + + algorithm = header.get("alg") + key_id = header.get("kid") + if algorithm != "RS256": + raise OAuthTokenValidationError("Invalid or expired OAuth token.", "oauth_jwt_alg") + if not isinstance(key_id, str) or not key_id.strip(): + raise OAuthTokenValidationError("Invalid or expired OAuth token.", "oauth_jwt_kid") + + matching_key = None + for key in jwks.get("keys", []): + if isinstance(key, dict) and key.get("kid") == key_id: + matching_key = key + break + if matching_key is None: + raise OAuthTokenValidationError( + "Invalid or expired OAuth token.", "oauth_jwt_key_not_found" + ) + + try: + public_key = RSAAlgorithm.from_jwk(json.dumps(matching_key)) + except Exception as exc: + raise OAuthTokenValidationError( + "Unable to validate OAuth token at this time.", + "oauth_jwt_invalid_jwk", + ) from exc + + expected_audience = ( + self.settings.oauth_expected_audience.strip() + or self.settings.gitea_oauth_client_id.strip() + ) + + decode_options = cast(Any, {"verify_aud": bool(expected_audience)}) + try: + claims = jwt.decode( + token, + key=cast(Any, public_key), + algorithms=["RS256"], + issuer=issuer, + audience=expected_audience or None, + options=decode_options, + ) + except InvalidTokenError as exc: + raise OAuthTokenValidationError( + "Invalid or expired OAuth token.", "oauth_jwt_invalid" + ) from exc + + if not isinstance(claims, dict): + raise OAuthTokenValidationError("Invalid or expired OAuth token.", "oauth_jwt_claims") + + scopes = self._extract_scopes(claims) + login = ( + str(claims.get("preferred_username", "")).strip() + or str(claims.get("name", "")).strip() + or str(claims.get("sub", "unknown")).strip() + ) + subject = str(claims.get("sub", login)).strip() or "unknown" + return { + "login": login, + "subject": subject, + "scopes": sorted(scopes), + } + + async def _validate_userinfo(self, token: str) -> dict[str, Any]: + """Validate token via Gitea userinfo endpoint (opaque token fallback).""" + userinfo_url = f"{self.settings.gitea_base_url}/login/oauth/userinfo" + try: + async with httpx.AsyncClient(timeout=self.settings.request_timeout_seconds) as client: + response = await client.get( + userinfo_url, + headers={ + "Authorization": f"Bearer {token}", + "Accept": "application/json", + }, + ) + except httpx.RequestError as exc: + raise OAuthTokenValidationError( + "Unable to validate OAuth token at this time.", + "oauth_userinfo_network", + ) from exc + + if response.status_code in {401, 403}: + raise OAuthTokenValidationError( + "Invalid or expired OAuth token.", "oauth_userinfo_denied" + ) + if response.status_code != 200: + raise OAuthTokenValidationError( + "Unable to validate OAuth token at this time.", + "oauth_userinfo_unavailable", + ) + + try: + payload = response.json() + except ValueError as exc: + raise OAuthTokenValidationError( + "Invalid or expired OAuth token.", "oauth_userinfo_json" + ) from exc + + if not isinstance(payload, dict): + raise OAuthTokenValidationError( + "Invalid or expired OAuth token.", "oauth_userinfo_type" + ) + + scopes = self._extract_scopes(payload) + login = ( + str(payload.get("preferred_username", "")).strip() + or str(payload.get("login", "")).strip() + or str(payload.get("name", "")).strip() + or str(payload.get("sub", "unknown")).strip() + ) + subject = str(payload.get("sub", login)).strip() or "unknown" + return { + "login": login, + "subject": subject, + "scopes": sorted(scopes), + } + + async def validate_oauth_token( + self, + token: str | None, + client_ip: str, + user_agent: str, + ) -> tuple[bool, str | None, dict[str, Any] | None]: + """Validate an incoming OAuth token and return principal context.""" + if not self._check_rate_limit(client_ip): + return False, "Too many failed authentication attempts. Try again later.", None + + if not token: + self._record_failed_attempt(client_ip) + return False, "Authorization header missing or empty.", None + + try: + if self._looks_like_jwt(token): + try: + principal = await self._validate_jwt(token) + except OAuthTokenValidationError: + # Some providers issue opaque access tokens; verify those via userinfo. + principal = await self._validate_userinfo(token) + else: + principal = await self._validate_userinfo(token) + except OAuthTokenValidationError as exc: + self._record_failed_attempt(client_ip) + self.audit.log_access_denied( + tool_name="oauth_authentication", + reason=exc.reason, + ) + return False, exc.public_message, None + + self.audit.log_tool_invocation( + tool_name="oauth_authentication", + result_status="success", + params={ + "client_ip": client_ip, + "user_agent": user_agent, + "gitea_user": principal.get("login", "unknown"), + }, + ) + return True, None, principal + + +_oauth_validator: GiteaOAuthValidator | None = None + + +def get_oauth_validator() -> GiteaOAuthValidator: + """Get or create the global OAuth validator instance.""" + global _oauth_validator + if _oauth_validator is None: + _oauth_validator = GiteaOAuthValidator() + return _oauth_validator + + +def reset_oauth_validator() -> None: + """Reset the global OAuth validator instance (primarily for testing).""" + global _oauth_validator + _oauth_validator = None diff --git a/src/aegis_gitea_mcp/request_context.py b/src/aegis_gitea_mcp/request_context.py index b508392..d562478 100644 --- a/src/aegis_gitea_mcp/request_context.py +++ b/src/aegis_gitea_mcp/request_context.py @@ -5,6 +5,9 @@ from __future__ import annotations from contextvars import ContextVar _REQUEST_ID: ContextVar[str] = ContextVar("request_id", default="-") +_GITEA_USER_TOKEN: ContextVar[str | None] = ContextVar("gitea_user_token", default=None) +_GITEA_USER_LOGIN: ContextVar[str | None] = ContextVar("gitea_user_login", default=None) +_GITEA_USER_SCOPES: ContextVar[tuple[str, ...]] = ContextVar("gitea_user_scopes", default=()) def set_request_id(request_id: str) -> None: @@ -15,3 +18,40 @@ def set_request_id(request_id: str) -> None: def get_request_id() -> str: """Get current request id from context-local state.""" return _REQUEST_ID.get() + + +def set_gitea_user_token(token: str) -> None: + """Store the per-request Gitea OAuth user token in context-local state.""" + _GITEA_USER_TOKEN.set(token) + + +def get_gitea_user_token() -> str | None: + """Get the per-request Gitea OAuth user token from context-local state.""" + return _GITEA_USER_TOKEN.get() + + +def set_gitea_user_login(login: str) -> None: + """Store the authenticated Gitea username in context-local state.""" + _GITEA_USER_LOGIN.set(login) + + +def get_gitea_user_login() -> str | None: + """Get the authenticated Gitea username from context-local state.""" + return _GITEA_USER_LOGIN.get() + + +def set_gitea_user_scopes(scopes: list[str] | set[str] | tuple[str, ...]) -> None: + """Store normalized OAuth scopes for the current request.""" + _GITEA_USER_SCOPES.set(tuple(sorted({scope.strip() for scope in scopes if scope.strip()}))) + + +def get_gitea_user_scopes() -> tuple[str, ...]: + """Get OAuth scopes attached to the current request.""" + return _GITEA_USER_SCOPES.get() + + +def clear_gitea_auth_context() -> None: + """Reset per-request Gitea authentication context values.""" + _GITEA_USER_TOKEN.set(None) + _GITEA_USER_LOGIN.set(None) + _GITEA_USER_SCOPES.set(()) diff --git a/src/aegis_gitea_mcp/server.py b/src/aegis_gitea_mcp/server.py index 4d078e5..83921bd 100644 --- a/src/aegis_gitea_mcp/server.py +++ b/src/aegis_gitea_mcp/server.py @@ -9,19 +9,15 @@ import uuid from collections.abc import AsyncGenerator, Awaitable, Callable from typing import Any +import httpx from fastapi import FastAPI, HTTPException, Request, Response from fastapi.responses import JSONResponse, PlainTextResponse, StreamingResponse from pydantic import BaseModel, Field, ValidationError from aegis_gitea_mcp.audit import get_audit_logger -from aegis_gitea_mcp.auth import get_validator from aegis_gitea_mcp.automation import AutomationError, AutomationManager from aegis_gitea_mcp.config import get_settings -from aegis_gitea_mcp.gitea_client import ( - GiteaAuthenticationError, - GiteaAuthorizationError, - GiteaClient, -) +from aegis_gitea_mcp.gitea_client import GiteaClient from aegis_gitea_mcp.logging_utils import configure_logging from aegis_gitea_mcp.mcp_protocol import ( AVAILABLE_TOOLS, @@ -30,10 +26,19 @@ from aegis_gitea_mcp.mcp_protocol import ( MCPToolCallResponse, get_tool_by_name, ) +from aegis_gitea_mcp.oauth import get_oauth_validator from aegis_gitea_mcp.observability import get_metrics_registry, monotonic_seconds from aegis_gitea_mcp.policy import PolicyError, get_policy_engine from aegis_gitea_mcp.rate_limit import get_rate_limiter -from aegis_gitea_mcp.request_context import set_request_id +from aegis_gitea_mcp.request_context import ( + clear_gitea_auth_context, + get_gitea_user_scopes, + get_gitea_user_token, + set_gitea_user_login, + set_gitea_user_scopes, + set_gitea_user_token, + set_request_id, +) from aegis_gitea_mcp.security import sanitize_data from aegis_gitea_mcp.tools.arguments import extract_repository, extract_target_path from aegis_gitea_mcp.tools.read_tools import ( @@ -66,6 +71,9 @@ from aegis_gitea_mcp.tools.write_tools import ( logger = logging.getLogger(__name__) +READ_SCOPE = "read:repository" +WRITE_SCOPE = "write:repository" + app = FastAPI( title="AegisGitea MCP Server", description="Security-first MCP server for controlled AI access to self-hosted Gitea", @@ -121,6 +129,32 @@ TOOL_HANDLERS: dict[str, ToolHandler] = { } +def _oauth_metadata_url(request: Request) -> str: + """Build absolute metadata URL for OAuth challenge responses.""" + return f"{str(request.base_url).rstrip('/')}/.well-known/oauth-protected-resource" + + +def _oauth_unauthorized_response( + request: Request, + message: str, + scope: str = READ_SCOPE, +) -> JSONResponse: + """Return RFC-compliant OAuth challenge response for protected MCP endpoints.""" + metadata_url = _oauth_metadata_url(request) + response = JSONResponse( + status_code=401, + content={ + "error": "Authentication failed", + "message": message, + "request_id": getattr(request.state, "request_id", "-"), + }, + ) + response.headers["WWW-Authenticate"] = ( + f'Bearer resource_metadata="{metadata_url}", scope="{scope}"' + ) + return response + + @app.middleware("http") async def request_context_middleware( request: Request, @@ -160,6 +194,7 @@ async def authenticate_and_rate_limit( call_next: Callable[[Request], Awaitable[Response]], ) -> Response: """Apply rate-limiting and authentication for MCP endpoints.""" + clear_gitea_auth_context() settings = get_settings() if request.url.path in {"/", "/health"}: @@ -169,21 +204,27 @@ async def authenticate_and_rate_limit( # Metrics endpoint is intentionally left unauthenticated for pull-based scraping. return await call_next(request) + # OAuth discovery and token endpoints must be public so ChatGPT can complete the flow. + if request.url.path in { + "/oauth/token", + "/.well-known/oauth-protected-resource", + "/.well-known/oauth-authorization-server", + }: + return await call_next(request) + if not (request.url.path.startswith("/mcp/") or request.url.path.startswith("/automation/")): return await call_next(request) - validator = get_validator() + oauth_validator = get_oauth_validator() limiter = get_rate_limiter() client_ip = request.client.host if request.client else "unknown" user_agent = request.headers.get("user-agent", "unknown") auth_header = request.headers.get("authorization") - api_key = validator.extract_bearer_token(auth_header) - if not api_key and request.url.path in {"/mcp/tool/call", "/mcp/sse"}: - api_key = request.query_params.get("api_key") + access_token = oauth_validator.extract_bearer_token(auth_header) - rate_limit = limiter.check(client_ip=client_ip, token=api_key) + rate_limit = limiter.check(client_ip=client_ip, token=access_token) if not rate_limit.allowed: return JSONResponse( status_code=429, @@ -198,18 +239,46 @@ async def authenticate_and_rate_limit( if request.url.path == "/mcp/tools": return await call_next(request) - is_valid, error_message = validator.validate_api_key(api_key, client_ip, user_agent) - if not is_valid: + if not access_token: + if request.url.path.startswith("/mcp/"): + return _oauth_unauthorized_response( + request, + "Provide Authorization: Bearer .", + scope=READ_SCOPE, + ) return JSONResponse( status_code=401, content={ "error": "Authentication failed", - "message": error_message, - "detail": "Provide Authorization: Bearer or ?api_key=", + "message": "Provide Authorization: Bearer .", "request_id": getattr(request.state, "request_id", "-"), }, ) + is_valid, error_message, user_data = await oauth_validator.validate_oauth_token( + access_token, client_ip, user_agent + ) + if not is_valid: + if request.url.path.startswith("/mcp/"): + return _oauth_unauthorized_response( + request, + error_message or "Invalid or expired OAuth token.", + scope=READ_SCOPE, + ) + return JSONResponse( + status_code=401, + content={ + "error": "Authentication failed", + "message": error_message or "Invalid or expired OAuth token.", + "request_id": getattr(request.state, "request_id", "-"), + }, + ) + + if user_data: + set_gitea_user_token(access_token) + set_gitea_user_login(str(user_data.get("login", "unknown"))) + set_gitea_user_scopes(user_data.get("scopes", [])) + return await call_next(request) @@ -240,23 +309,25 @@ async def startup_event() -> None: raise if settings.startup_validate_gitea and settings.environment != "test": + discovery_url = f"{settings.gitea_base_url}/.well-known/openid-configuration" try: - async with GiteaClient() as gitea: - user = await gitea.get_current_user() - logger.info("gitea_connected", extra={"bot_user": user.get("login", "unknown")}) - except GiteaAuthenticationError as exc: - logger.error("gitea_connection_failed_authentication") + async with httpx.AsyncClient(timeout=settings.request_timeout_seconds) as client: + response = await client.get(discovery_url, headers={"Accept": "application/json"}) + except httpx.RequestError as exc: + logger.error("gitea_oidc_discovery_request_failed") raise RuntimeError( - "Startup validation failed: Gitea authentication was rejected. Check GITEA_TOKEN." + "Startup validation failed: unable to reach Gitea OIDC discovery endpoint." ) from exc - except GiteaAuthorizationError as exc: - logger.error("gitea_connection_failed_authorization") + + if response.status_code != 200: + logger.error( + "gitea_oidc_discovery_non_200", extra={"status_code": response.status_code} + ) raise RuntimeError( - "Startup validation failed: Gitea token lacks permission for /api/v1/user." - ) from exc - except Exception as exc: - logger.error("gitea_connection_failed") - raise RuntimeError("Startup validation failed: unable to connect to Gitea.") from exc + "Startup validation failed: Gitea OIDC discovery endpoint returned non-200." + ) + + logger.info("gitea_oidc_discovery_ready", extra={"issuer": settings.gitea_base_url}) @app.on_event("shutdown") @@ -282,6 +353,108 @@ async def health() -> dict[str, str]: return {"status": "healthy"} +@app.get("/.well-known/oauth-protected-resource") +async def oauth_protected_resource_metadata() -> JSONResponse: + """OAuth 2.0 Protected Resource Metadata (RFC 9728). + + Required by the MCP Authorization spec so that OAuth clients (e.g. ChatGPT) + can discover the authorization server that protects this resource. + ChatGPT fetches this endpoint when it first connects to the MCP server via SSE. + """ + settings = get_settings() + gitea_base = settings.gitea_base_url + + return JSONResponse( + content={ + "resource": gitea_base, + "authorization_servers": [gitea_base], + "bearer_methods_supported": ["header"], + "scopes_supported": [READ_SCOPE, WRITE_SCOPE], + "resource_documentation": str(settings.oauth_resource_documentation), + } + ) + + +@app.get("/.well-known/oauth-authorization-server") +async def oauth_authorization_server_metadata(request: Request) -> JSONResponse: + """OAuth 2.0 Authorization Server Metadata (RFC 8414). + + Proxies Gitea's OAuth authorization server metadata so that ChatGPT can + discover the authorize URL, token URL, and supported features directly + from this server without needing to know the Gitea URL upfront. + """ + settings = get_settings() + base_url = str(request.base_url).rstrip("/") + gitea_base = settings.gitea_base_url + + return JSONResponse( + content={ + "issuer": gitea_base, + "authorization_endpoint": f"{gitea_base}/login/oauth/authorize", + "token_endpoint": f"{base_url}/oauth/token", + "response_types_supported": ["code"], + "grant_types_supported": ["authorization_code"], + "code_challenge_methods_supported": ["S256"], + "scopes_supported": [READ_SCOPE, WRITE_SCOPE], + "token_endpoint_auth_methods_supported": ["client_secret_post", "none"], + } + ) + + +@app.post("/oauth/token") +async def oauth_token_proxy(request: Request) -> JSONResponse: + """Proxy OAuth2 token exchange to Gitea. + + ChatGPT sends the authorization code here after the user logs in to Gitea. + This endpoint forwards the code to Gitea's token endpoint and returns the + access_token to ChatGPT, completing the OAuth2 Authorization Code flow. + """ + settings = get_settings() + + try: + form_data = await request.form() + except Exception as exc: + raise HTTPException(status_code=400, detail="Invalid request body") from exc + + code = form_data.get("code") + redirect_uri = form_data.get("redirect_uri", "") + # ChatGPT sends the client_id and client_secret (that were configured in the GPT Action + # settings) in the POST body. Use those directly; fall back to env vars if not provided. + client_id = form_data.get("client_id") or settings.gitea_oauth_client_id + client_secret = form_data.get("client_secret") or settings.gitea_oauth_client_secret + + if not code: + raise HTTPException(status_code=400, detail="Missing authorization code") + + gitea_token_url = f"{settings.gitea_base_url}/login/oauth/access_token" + payload = { + "client_id": client_id, + "client_secret": client_secret, + "code": code, + "grant_type": "authorization_code", + "redirect_uri": redirect_uri, + } + + try: + async with httpx.AsyncClient(timeout=30) as client: + response = await client.post( + gitea_token_url, + data=payload, + headers={"Accept": "application/json"}, + ) + except httpx.RequestError as exc: + logger.error("oauth_token_proxy_error", extra={"error": str(exc)}) + raise HTTPException(status_code=502, detail="Failed to reach Gitea token endpoint") from exc + + if response.status_code != 200: + raise HTTPException( + status_code=response.status_code, + detail="Token exchange failed with Gitea", + ) + + return JSONResponse(content=response.json()) + + @app.get("/metrics") async def metrics() -> PlainTextResponse: """Prometheus-compatible metrics endpoint.""" @@ -316,6 +489,7 @@ async def automation_run_job(request: AutomationJobRequest) -> JSONResponse: job_name=request.job_name, owner=request.owner, repo=request.repo, + user_token=get_gitea_user_token(), finding_title=request.finding_title, finding_body=request.finding_body, ) @@ -343,6 +517,19 @@ async def _execute_tool_call( if not tool_def: raise HTTPException(status_code=404, detail=f"Tool '{tool_name}' not found") + required_scope = WRITE_SCOPE if tool_def.write_operation else READ_SCOPE + granted_scopes = set(get_gitea_user_scopes()) + if required_scope not in granted_scopes: + audit.log_access_denied( + tool_name=tool_name, + reason=f"insufficient_scope:{required_scope}", + correlation_id=correlation_id, + ) + raise HTTPException( + status_code=403, + detail=f"Insufficient scope. Required scope: {required_scope}", + ) + handler = TOOL_HANDLERS.get(tool_name) if not handler: raise HTTPException( @@ -370,7 +557,11 @@ async def _execute_tool_call( status = "error" try: - async with GiteaClient() as gitea: + user_token = get_gitea_user_token() + if not user_token: + raise HTTPException(status_code=401, detail="Missing authenticated user token context") + + async with GiteaClient(token=user_token) as gitea: result = await handler(gitea, arguments) if settings.secret_detection_mode != "off": @@ -542,6 +733,20 @@ async def sse_message_handler(request: Request) -> JSONResponse: "result": {"content": [{"type": "text", "text": json.dumps(result)}]}, } ) + except HTTPException as exc: + audit.log_tool_invocation( + tool_name=str(tool_name), + correlation_id=correlation_id, + result_status="error", + error=str(exc.detail), + ) + return JSONResponse( + content={ + "jsonrpc": "2.0", + "id": message_id, + "error": {"code": -32000, "message": str(exc.detail)}, + } + ) except Exception as exc: audit.log_tool_invocation( tool_name=str(tool_name), diff --git a/tests/conftest.py b/tests/conftest.py index 88c155f..2868c39 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,6 +8,7 @@ import pytest from aegis_gitea_mcp.audit import reset_audit_logger from aegis_gitea_mcp.auth import reset_validator from aegis_gitea_mcp.config import reset_settings +from aegis_gitea_mcp.oauth import reset_oauth_validator from aegis_gitea_mcp.observability import reset_metrics_registry from aegis_gitea_mcp.policy import reset_policy_engine from aegis_gitea_mcp.rate_limit import reset_rate_limiter @@ -20,6 +21,7 @@ def reset_globals(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Generator[ reset_settings() reset_audit_logger() reset_validator() + reset_oauth_validator() reset_policy_engine() reset_rate_limiter() reset_metrics_registry() @@ -34,6 +36,7 @@ def reset_globals(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Generator[ reset_settings() reset_audit_logger() reset_validator() + reset_oauth_validator() reset_policy_engine() reset_rate_limiter() reset_metrics_registry() @@ -41,7 +44,7 @@ def reset_globals(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Generator[ @pytest.fixture def mock_env(monkeypatch: pytest.MonkeyPatch) -> None: - """Set up mock environment variables for testing.""" + """Set up mock environment variables for testing (standard API key mode).""" monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") monkeypatch.setenv("GITEA_TOKEN", "test-token-12345") monkeypatch.setenv("ENVIRONMENT", "test") @@ -50,3 +53,17 @@ def mock_env(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv("LOG_LEVEL", "DEBUG") monkeypatch.setenv("MCP_API_KEYS", "a" * 64) monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "false") + + +@pytest.fixture +def mock_env_oauth(monkeypatch: pytest.MonkeyPatch) -> None: + """Set up mock environment variables for OAuth mode testing.""" + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("ENVIRONMENT", "test") + monkeypatch.setenv("MCP_HOST", "127.0.0.1") + monkeypatch.setenv("MCP_PORT", "8080") + monkeypatch.setenv("LOG_LEVEL", "DEBUG") + monkeypatch.setenv("OAUTH_MODE", "true") + monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id") + monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret") + monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "false") diff --git a/tests/test_automation.py b/tests/test_automation.py index 1f34d80..97623d0 100644 --- a/tests/test_automation.py +++ b/tests/test_automation.py @@ -6,6 +6,21 @@ import pytest from fastapi.testclient import TestClient +@pytest.fixture +def allow_oauth(monkeypatch: pytest.MonkeyPatch) -> None: + """Mock OAuth validation to return a deterministic authenticated principal.""" + + async def _validate(_self, token, _ip, _ua): + if token == "a" * 64: + return True, None, {"login": "automation-user", "scopes": ["read:repository"]} + return False, "Invalid or expired OAuth token.", None + + monkeypatch.setattr( + "aegis_gitea_mcp.oauth.GiteaOAuthValidator.validate_oauth_token", + _validate, + ) + + def _set_base_env( monkeypatch: pytest.MonkeyPatch, automation_enabled: bool, policy_path: Path ) -> None: @@ -20,7 +35,7 @@ def _set_base_env( def test_automation_job_denied_when_disabled( - monkeypatch: pytest.MonkeyPatch, tmp_path: Path + monkeypatch: pytest.MonkeyPatch, tmp_path: Path, allow_oauth: None ) -> None: """Automation endpoints should deny requests when automation mode is disabled.""" policy_path = tmp_path / "policy.yaml" @@ -41,7 +56,7 @@ def test_automation_job_denied_when_disabled( def test_automation_job_executes_when_enabled( - monkeypatch: pytest.MonkeyPatch, tmp_path: Path + monkeypatch: pytest.MonkeyPatch, tmp_path: Path, allow_oauth: None ) -> None: """Dependency scan job should execute when automation is enabled and policy allows it.""" policy_path = tmp_path / "policy.yaml" @@ -74,7 +89,9 @@ tools: assert payload["result"]["job"] == "dependency_hygiene_scan" -def test_automation_webhook_policy_denied(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: +def test_automation_webhook_policy_denied( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path, allow_oauth: None +) -> None: """Webhook ingestion must respect policy deny rules.""" policy_path = tmp_path / "policy.yaml" policy_path.write_text( @@ -104,7 +121,7 @@ tools: def test_auto_issue_creation_denied_without_write_mode( - monkeypatch: pytest.MonkeyPatch, tmp_path: Path + monkeypatch: pytest.MonkeyPatch, tmp_path: Path, allow_oauth: None ) -> None: """Auto issue creation job should be denied unless write mode is enabled.""" policy_path = tmp_path / "policy.yaml" diff --git a/tests/test_automation_manager.py b/tests/test_automation_manager.py new file mode 100644 index 0000000..0d9bedf --- /dev/null +++ b/tests/test_automation_manager.py @@ -0,0 +1,140 @@ +"""Unit tests for automation manager job paths.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from aegis_gitea_mcp.automation import AutomationError, AutomationManager +from aegis_gitea_mcp.config import reset_settings +from aegis_gitea_mcp.policy import reset_policy_engine + + +class StubAutomationGiteaClient: + """Async context manager stub for automation jobs.""" + + def __init__(self, token: str, issues: list[dict] | None = None) -> None: + self.token = token + self._issues = issues or [] + + async def __aenter__(self): + return self + + async def __aexit__(self, *_args): + return None + + async def list_issues(self, owner, repo, *, state, page, limit, labels=None): + return self._issues + + async def create_issue(self, owner, repo, *, title, body, labels=None, assignees=None): + return {"number": 77, "title": title, "body": body} + + +@pytest.fixture(autouse=True) +def reset_globals() -> None: + """Reset singleton state between tests.""" + reset_settings() + reset_policy_engine() + yield + reset_settings() + reset_policy_engine() + + +@pytest.fixture +def automation_env(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> Path: + """Set environment for automation manager tests.""" + policy_path = tmp_path / "policy.yaml" + policy_path.write_text( + ( + "defaults:\n" + " read: allow\n" + " write: allow\n" + "tools:\n" + " allow:\n" + " - automation_stale_issue_detection\n" + " - automation_auto_issue_creation\n" + ), + encoding="utf-8", + ) + + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("GITEA_TOKEN", "legacy-token") + monkeypatch.setenv("MCP_API_KEYS", "a" * 64) + monkeypatch.setenv("ENVIRONMENT", "test") + monkeypatch.setenv("AUTOMATION_ENABLED", "true") + monkeypatch.setenv("POLICY_FILE_PATH", str(policy_path)) + monkeypatch.setenv("WRITE_MODE", "true") + monkeypatch.setenv("WRITE_REPOSITORY_WHITELIST", "acme/demo") + monkeypatch.setenv("AUTOMATION_STALE_DAYS", "30") + return policy_path + + +@pytest.mark.asyncio +async def test_stale_issue_detection_job_finds_old_issues( + automation_env: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Stale issue detection returns issue numbers older than cutoff.""" + + issues = [ + {"number": 1, "updated_at": "2020-01-01T00:00:00Z"}, + {"number": 2, "updated_at": "2999-01-01T00:00:00Z"}, + ] + + monkeypatch.setattr( + "aegis_gitea_mcp.automation.GiteaClient", + lambda token: StubAutomationGiteaClient(token=token, issues=issues), + ) + + manager = AutomationManager() + result = await manager.run_job( + job_name="stale_issue_detection", + owner="acme", + repo="demo", + user_token="user-token", + ) + + assert result["stale_issue_numbers"] == [1] + assert result["stale_count"] == 1 + + +@pytest.mark.asyncio +async def test_auto_issue_creation_requires_token( + automation_env: Path, +) -> None: + """Auto-issue creation is denied when no user token is provided.""" + manager = AutomationManager() + + with pytest.raises(AutomationError, match="missing authenticated user token"): + await manager.run_job( + job_name="auto_issue_creation", + owner="acme", + repo="demo", + user_token=None, + ) + + +@pytest.mark.asyncio +async def test_auto_issue_creation_job_success( + automation_env: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Auto-issue creation succeeds with write mode + scope + token.""" + monkeypatch.setattr( + "aegis_gitea_mcp.automation.GiteaClient", + lambda token: StubAutomationGiteaClient(token=token), + ) + + manager = AutomationManager() + result = await manager.run_job( + job_name="auto_issue_creation", + owner="acme", + repo="demo", + user_token="user-token", + finding_title="Security finding", + finding_body="Details", + ) + + assert result["job"] == "auto_issue_creation" + assert result["issue_number"] == 77 diff --git a/tests/test_gitea_client.py b/tests/test_gitea_client.py new file mode 100644 index 0000000..53750fb --- /dev/null +++ b/tests/test_gitea_client.py @@ -0,0 +1,168 @@ +"""Unit tests for Gitea client request behavior.""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, patch + +import pytest +from httpx import Request, Response + +from aegis_gitea_mcp.config import reset_settings +from aegis_gitea_mcp.gitea_client import ( + GiteaAuthenticationError, + GiteaAuthorizationError, + GiteaClient, + GiteaError, + GiteaNotFoundError, +) + + +@pytest.fixture(autouse=True) +def gitea_env(monkeypatch: pytest.MonkeyPatch) -> None: + """Provide minimal environment for client initialization.""" + reset_settings() + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("GITEA_TOKEN", "legacy-token") + monkeypatch.setenv("MCP_API_KEYS", "a" * 64) + monkeypatch.setenv("ENVIRONMENT", "test") + yield + reset_settings() + + +@pytest.mark.asyncio +async def test_client_context_uses_bearer_header() -> None: + """HTTP client is created with bearer token and closed on exit.""" + with patch("aegis_gitea_mcp.gitea_client.AsyncClient") as mock_async_client: + mock_instance = AsyncMock() + mock_async_client.return_value = mock_instance + + async with GiteaClient(token="user-oauth-token"): + pass + + _, kwargs = mock_async_client.call_args + assert kwargs["headers"]["Authorization"] == "Bearer user-oauth-token" + mock_instance.aclose.assert_awaited_once() + + +def test_client_requires_non_empty_token() -> None: + """Client construction fails when token is missing.""" + with pytest.raises(ValueError, match="non-empty"): + GiteaClient(token=" ") + + +def test_handle_response_maps_error_codes() -> None: + """HTTP status codes map to explicit domain exceptions.""" + client = GiteaClient(token="user-token") + request = Request("GET", "https://gitea.example.com/api/v1/user") + + with pytest.raises(GiteaAuthenticationError): + client._handle_response(Response(401, request=request), correlation_id="c1") + + with pytest.raises(GiteaAuthorizationError): + client._handle_response(Response(403, request=request), correlation_id="c2") + + with pytest.raises(GiteaNotFoundError): + client._handle_response(Response(404, request=request), correlation_id="c3") + + with pytest.raises(GiteaError, match="boom"): + client._handle_response( + Response(500, request=request, json={"message": "boom"}), + correlation_id="c4", + ) + + assert client._handle_response(Response(200, request=request, json={"ok": True}), "c5") == { + "ok": True + } + + +@pytest.mark.asyncio +async def test_public_methods_delegate_to_request_and_normalize() -> None: + """Wrapper methods call shared request logic and normalize return types.""" + client = GiteaClient(token="user-token") + + async def fake_request(method: str, endpoint: str, **kwargs): + if endpoint == "/api/v1/user": + return {"login": "alice"} + if endpoint == "/api/v1/user/repos": + return [{"name": "repo"}] + if endpoint == "/api/v1/repos/acme/demo": + return {"name": "demo"} + if endpoint == "/api/v1/repos/acme/demo/contents/README.md": + return {"size": 8, "content": "aGVsbG8=", "encoding": "base64"} + if endpoint == "/api/v1/repos/acme/demo/git/trees/main": + return {"tree": [{"path": "README.md"}]} + if endpoint == "/api/v1/repos/acme/demo/search": + return {"hits": []} + if endpoint == "/api/v1/repos/acme/demo/commits": + return [{"sha": "abc"}] + if endpoint == "/api/v1/repos/acme/demo/git/commits/abc": + return {"sha": "abc"} + if endpoint == "/api/v1/repos/acme/demo/compare/main...feature": + return {"total_commits": 1} + if endpoint == "/api/v1/repos/acme/demo/issues": + if method == "GET": + return [{"number": 1}] + return {"number": 12} + if endpoint == "/api/v1/repos/acme/demo/issues/1": + if method == "GET": + return {"number": 1} + return {"number": 1, "state": "closed"} + if endpoint == "/api/v1/repos/acme/demo/pulls": + return [{"number": 2}] + if endpoint == "/api/v1/repos/acme/demo/pulls/2": + return {"number": 2} + if endpoint == "/api/v1/repos/acme/demo/labels": + return [{"name": "bug"}] + if endpoint == "/api/v1/repos/acme/demo/tags": + return [{"name": "v1"}] + if endpoint == "/api/v1/repos/acme/demo/releases": + return [{"id": 1}] + if endpoint == "/api/v1/repos/acme/demo/issues/1/comments": + return {"id": 9} + if endpoint == "/api/v1/repos/acme/demo/issues/1/labels": + return {"labels": [{"name": "bug"}]} + if endpoint == "/api/v1/repos/acme/demo/issues/1/assignees": + return {"assignees": [{"login": "alice"}]} + return {} + + client._request = AsyncMock(side_effect=fake_request) # type: ignore[method-assign] + + assert (await client.get_current_user())["login"] == "alice" + assert len(await client.list_repositories()) == 1 + assert (await client.get_repository("acme", "demo"))["name"] == "demo" + assert (await client.get_file_contents("acme", "demo", "README.md"))["size"] == 8 + assert len((await client.get_tree("acme", "demo"))["tree"]) == 1 + assert isinstance( + await client.search_code("acme", "demo", "needle", ref="main", page=1, limit=5), dict + ) + assert len(await client.list_commits("acme", "demo", ref="main", page=1, limit=5)) == 1 + assert (await client.get_commit_diff("acme", "demo", "abc"))["sha"] == "abc" + assert isinstance(await client.compare_refs("acme", "demo", "main", "feature"), dict) + assert len(await client.list_issues("acme", "demo", state="open", page=1, limit=10)) == 1 + assert (await client.get_issue("acme", "demo", 1))["number"] == 1 + assert len(await client.list_pull_requests("acme", "demo", state="open", page=1, limit=10)) == 1 + assert (await client.get_pull_request("acme", "demo", 2))["number"] == 2 + assert len(await client.list_labels("acme", "demo", page=1, limit=10)) == 1 + assert len(await client.list_tags("acme", "demo", page=1, limit=10)) == 1 + assert len(await client.list_releases("acme", "demo", page=1, limit=10)) == 1 + assert (await client.create_issue("acme", "demo", title="Hi", body="Body"))["number"] == 12 + assert (await client.update_issue("acme", "demo", 1, state="closed"))["state"] == "closed" + assert (await client.create_issue_comment("acme", "demo", 1, "comment"))["id"] == 9 + assert (await client.create_pr_comment("acme", "demo", 1, "comment"))["id"] == 9 + assert isinstance(await client.add_labels("acme", "demo", 1, ["bug"]), dict) + assert isinstance(await client.assign_issue("acme", "demo", 1, ["alice"]), dict) + + +@pytest.mark.asyncio +async def test_get_file_contents_blocks_oversized_payload(monkeypatch: pytest.MonkeyPatch) -> None: + """File size limits are enforced before returning content.""" + monkeypatch.setenv("MAX_FILE_SIZE_BYTES", "5") + reset_settings() + client = GiteaClient(token="user-token") + + client._request = AsyncMock( # type: ignore[method-assign] + return_value={"size": 50, "content": "x", "encoding": "base64"} + ) + + with pytest.raises(GiteaError, match="exceeds limit"): + await client.get_file_contents("acme", "demo", "big.bin") diff --git a/tests/test_integration.py b/tests/test_integration.py index f9333f7..986a19c 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -1,225 +1,120 @@ -"""Integration tests for the complete system.""" +"""Integration tests for end-to-end MCP authentication behavior.""" + +from __future__ import annotations + +from unittest.mock import patch import pytest from fastapi.testclient import TestClient -from aegis_gitea_mcp.auth import reset_validator from aegis_gitea_mcp.config import reset_settings +from aegis_gitea_mcp.oauth import reset_oauth_validator @pytest.fixture(autouse=True) -def reset_state(): +def reset_state() -> None: """Reset global state between tests.""" reset_settings() - reset_validator() + reset_oauth_validator() yield reset_settings() - reset_validator() + reset_oauth_validator() @pytest.fixture -def full_env(monkeypatch): - """Set up complete test environment.""" +def full_env(monkeypatch: pytest.MonkeyPatch) -> None: + """Set OAuth-enabled environment for integration tests.""" monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") - monkeypatch.setenv("GITEA_TOKEN", "test-gitea-token-12345") + monkeypatch.setenv("OAUTH_MODE", "true") + monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id") + monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret") monkeypatch.setenv("ENVIRONMENT", "test") - monkeypatch.setenv("AUTH_ENABLED", "true") - monkeypatch.setenv("MCP_API_KEYS", f"{'a' * 64},{'b' * 64}") monkeypatch.setenv("MCP_HOST", "127.0.0.1") monkeypatch.setenv("MCP_PORT", "8080") monkeypatch.setenv("LOG_LEVEL", "INFO") - monkeypatch.setenv("MAX_AUTH_FAILURES", "5") - monkeypatch.setenv("AUTH_FAILURE_WINDOW", "300") monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "false") @pytest.fixture -def client(full_env): - """Create test client with full environment.""" +def client(full_env: None, monkeypatch: pytest.MonkeyPatch) -> TestClient: + """Create test client with deterministic OAuth behavior.""" + + async def _validate(_self, token: str | None, _ip: str, _ua: str): + if token == "valid-read-token": + return True, None, {"login": "alice", "scopes": ["read:repository"]} + return False, "Invalid or expired OAuth token.", None + + monkeypatch.setattr( + "aegis_gitea_mcp.oauth.GiteaOAuthValidator.validate_oauth_token", + _validate, + ) + from aegis_gitea_mcp.server import app return TestClient(app) -def test_complete_authentication_flow(client): - """Test complete authentication flow from start to finish.""" - # 1. Health check should work without auth - response = client.get("/health") - assert response.status_code == 200 +def test_no_token_returns_401_with_www_authenticate(client: TestClient) -> None: + """Missing bearer token is rejected with OAuth challenge metadata.""" + response = client.post( + "/mcp/tool/call", + json={"tool": "list_repositories", "arguments": {}}, + ) - # 2. Tool listing should work without auth (Mixed mode for ChatGPT) - response = client.get("/mcp/tools") - assert response.status_code == 200 + assert response.status_code == 401 + assert "WWW-Authenticate" in response.headers + assert "resource_metadata=" in response.headers["WWW-Authenticate"] + + +def test_invalid_token_returns_401(client: TestClient) -> None: + """Invalid OAuth token is rejected.""" + response = client.post( + "/mcp/tool/call", + headers={"Authorization": "Bearer invalid-token"}, + json={"tool": "list_repositories", "arguments": {}}, + ) - # 3. Protected endpoint (tool execution) should reject without auth - response = client.post("/mcp/tool/call", json={"tool": "list_repositories", "arguments": {}}) assert response.status_code == 401 - # 4. Protected endpoint should reject with invalid key - response = client.post( - "/mcp/tool/call", - headers={"Authorization": "Bearer " + "c" * 64}, - json={"tool": "list_repositories", "arguments": {}}, - ) - assert response.status_code == 401 - # 5. Protected endpoint should pass auth with valid key (first key) - # Note: May fail with 500 due to missing Gitea connection, but auth passes - response = client.post( - "/mcp/tool/call", - headers={"Authorization": "Bearer " + "a" * 64}, - json={"tool": "list_repositories", "arguments": {}}, - ) - assert response.status_code != 401 +def test_valid_token_executes_tool(client: TestClient) -> None: + """Valid OAuth token allows tool execution.""" + with patch("aegis_gitea_mcp.gitea_client.GiteaClient.list_repositories") as mock_list_repos: + mock_list_repos.return_value = [{"id": 1, "name": "repo-one", "owner": {"login": "alice"}}] - # 6. Protected endpoint should pass auth with valid key (second key) - response = client.post( - "/mcp/tool/call", - headers={"Authorization": "Bearer " + "b" * 64}, - json={"tool": "list_repositories", "arguments": {}}, - ) - assert response.status_code != 401 - - -def test_key_rotation_simulation(client, monkeypatch): - """Simulate key rotation with grace period.""" - # Start with key A - response = client.get("/mcp/tools", headers={"Authorization": "Bearer " + "a" * 64}) - assert response.status_code == 200 - - # Both keys A and B work (grace period) - response = client.get("/mcp/tools", headers={"Authorization": "Bearer " + "a" * 64}) - assert response.status_code == 200 - - response = client.get("/mcp/tools", headers={"Authorization": "Bearer " + "b" * 64}) - assert response.status_code == 200 - - -def test_multiple_tool_calls_with_auth(client): - """Test multiple tool calls with authentication.""" - headers = {"Authorization": "Bearer " + "a" * 64} - - # List tools - response = client.get("/mcp/tools", headers=headers) - assert response.status_code == 200 - tools = response.json()["tools"] - - # Try to call each tool (will fail without proper Gitea connection, but auth should work) - for tool in tools: response = client.post( - "/mcp/tool/call", headers=headers, json={"tool": tool["name"], "arguments": {}} + "/mcp/tool/call", + headers={"Authorization": "Bearer valid-read-token"}, + json={"tool": "list_repositories", "arguments": {}}, ) - # Should pass auth but may fail on actual execution (Gitea not available in tests) - assert response.status_code != 401 # Not auth error - - -def test_concurrent_requests_different_ips(client): - """Test that different IPs are tracked separately for rate limiting.""" - # This is a simplified test since we can't easily simulate different IPs in TestClient - # But we can verify rate limiting works for single IP - - headers_invalid = {"Authorization": "Bearer " + "x" * 64} - tool_call_data = {"tool": "list_repositories", "arguments": {}} - - # Make 5 failed attempts on protected endpoint - for _ in range(5): - response = client.post("/mcp/tool/call", headers=headers_invalid, json=tool_call_data) - assert response.status_code == 401 - - # 6th attempt should be rate limited - response = client.post("/mcp/tool/call", headers=headers_invalid, json=tool_call_data) - assert response.status_code == 401 - data = response.json() - assert "Too many failed" in data["message"] - - # Note: Rate limiting is IP-based, so even valid keys from the same IP are blocked - # This is a security feature to prevent brute force attacks - response = client.post( - "/mcp/tool/call", headers={"Authorization": "Bearer " + "a" * 64}, json=tool_call_data - ) - # After rate limit is triggered, all requests from that IP are blocked - assert response.status_code == 401 - - -def test_all_mcp_tools_discoverable(client): - """Test that all MCP tools are properly registered and discoverable.""" - response = client.get("/mcp/tools", headers={"Authorization": "Bearer " + "a" * 64}) assert response.status_code == 200 - data = response.json() - tools = data["tools"] - - # Expected tools - expected_tools = [ - "list_repositories", - "get_repository_info", - "get_file_tree", - "get_file_contents", - "search_code", - "list_commits", - "get_commit_diff", - "compare_refs", - "list_issues", - "get_issue", - "list_pull_requests", - "get_pull_request", - "list_labels", - "list_tags", - "list_releases", - "create_issue", - "update_issue", - "create_issue_comment", - "create_pr_comment", - "add_labels", - "assign_issue", - ] - - tool_names = [tool["name"] for tool in tools] - - for expected in expected_tools: - assert expected in tool_names, f"Tool {expected} not found in registered tools" - - # Verify each tool has required fields - for tool in tools: - assert "name" in tool - assert "description" in tool - assert "inputSchema" in tool - assert tool["description"] # Not empty - assert "type" in tool["inputSchema"] + payload = response.json() + assert payload["success"] is True + assert "result" in payload -def test_error_responses_include_helpful_messages(client): - """Test that error responses include helpful messages for users.""" - tool_data = {"tool": "list_repositories", "arguments": {}} - - # Missing auth on protected endpoint - response = client.post("/mcp/tool/call", json=tool_data) - assert response.status_code == 401 - data = response.json() - assert "Authorization" in data["detail"] or "Authentication" in data["error"] - - # Invalid key format +def test_write_scope_enforcement_returns_403(client: TestClient) -> None: + """Write tool calls are denied when token lacks write scope.""" response = client.post( - "/mcp/tool/call", headers={"Authorization": "Bearer short"}, json=tool_data + "/mcp/tool/call", + headers={"Authorization": "Bearer valid-read-token"}, + json={ + "tool": "create_issue", + "arguments": {"owner": "acme", "repo": "demo", "title": "Needs write scope"}, + }, + ) + + assert response.status_code == 403 + assert "required scope: write:repository" in response.json()["detail"].lower() + + +def test_error_responses_include_helpful_messages(client: TestClient) -> None: + """Auth failures include actionable guidance.""" + response = client.post( + "/mcp/tool/call", + json={"tool": "list_repositories", "arguments": {}}, ) assert response.status_code == 401 data = response.json() - assert ( - "Invalid" in data.get("message", "") - or "format" in data.get("message", "").lower() - or "Authentication" in data.get("error", "") - ) - - -def test_audit_logging_integration(client, tmp_path, monkeypatch): - """Test that audit logging works with authentication.""" - # Set audit log to temp file - audit_log = tmp_path / "audit.log" - monkeypatch.setenv("AUDIT_LOG_PATH", str(audit_log)) - - # Make authenticated request - response = client.get("/mcp/tools", headers={"Authorization": "Bearer " + "a" * 64}) - assert response.status_code == 200 - - # Note: In real system, audit logs would be written - # This test verifies the system doesn't crash with audit logging enabled + assert "Provide Authorization" in data["message"] diff --git a/tests/test_oauth.py b/tests/test_oauth.py new file mode 100644 index 0000000..fae7bd8 --- /dev/null +++ b/tests/test_oauth.py @@ -0,0 +1,379 @@ +"""Tests for OAuth2 per-user Gitea authentication.""" + +from __future__ import annotations + +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from fastapi.testclient import TestClient + +from aegis_gitea_mcp.config import reset_settings +from aegis_gitea_mcp.oauth import GiteaOAuthValidator, get_oauth_validator, reset_oauth_validator +from aegis_gitea_mcp.request_context import ( + get_gitea_user_login, + get_gitea_user_token, + set_gitea_user_login, + set_gitea_user_token, +) + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def reset_state(): + """Reset global state between tests.""" + reset_settings() + reset_oauth_validator() + yield + reset_settings() + reset_oauth_validator() + + +@pytest.fixture +def mock_env_oauth(monkeypatch): + """Environment for OAuth mode tests.""" + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("ENVIRONMENT", "test") + monkeypatch.setenv("OAUTH_MODE", "true") + monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id") + monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret") + monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "false") + + +@pytest.fixture +def oauth_validator(mock_env_oauth): + """Create GiteaOAuthValidator instance in OAuth mode.""" + return GiteaOAuthValidator() + + +@pytest.fixture +def oauth_client(mock_env_oauth): + """Create FastAPI test client in OAuth mode.""" + from aegis_gitea_mcp.server import app + + return TestClient(app, raise_server_exceptions=False) + + +# --------------------------------------------------------------------------- +# GiteaOAuthValidator unit tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_validate_oauth_token_success(oauth_validator): + """Valid Gitea OAuth token returns is_valid=True and user_data.""" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"login": "testuser", "id": 42} + + with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=mock_response) + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + is_valid, error, user_data = await oauth_validator.validate_oauth_token( + "valid-gitea-token", "127.0.0.1", "TestAgent/1.0" + ) + + assert is_valid is True + assert error is None + assert user_data is not None + assert user_data["login"] == "testuser" + + +@pytest.mark.asyncio +async def test_validate_oauth_token_invalid_401(oauth_validator): + """Gitea returning 401 results in is_valid=False.""" + mock_response = MagicMock() + mock_response.status_code = 401 + + with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=mock_response) + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + is_valid, error, user_data = await oauth_validator.validate_oauth_token( + "expired-token", "127.0.0.1", "TestAgent/1.0" + ) + + assert is_valid is False + assert error is not None + assert user_data is None + + +@pytest.mark.asyncio +async def test_validate_oauth_token_missing_token(oauth_validator): + """Missing token results in is_valid=False.""" + is_valid, error, user_data = await oauth_validator.validate_oauth_token( + None, "127.0.0.1", "TestAgent/1.0" + ) + + assert is_valid is False + assert error is not None + assert user_data is None + + +@pytest.mark.asyncio +async def test_validate_oauth_token_network_error(oauth_validator): + """Network error results in is_valid=False with informative message.""" + import httpx + + with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.get = AsyncMock( + side_effect=httpx.RequestError("Connection refused", request=MagicMock()) + ) + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + is_valid, error, user_data = await oauth_validator.validate_oauth_token( + "some-token", "127.0.0.1", "TestAgent/1.0" + ) + + assert is_valid is False + assert error is not None + assert "unable to validate oauth token" in error.lower() + assert user_data is None + + +@pytest.mark.asyncio +async def test_validate_oauth_token_rate_limit(oauth_validator): + """Exceeding failure threshold triggers rate limiting.""" + mock_response = MagicMock() + mock_response.status_code = 401 + + with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=mock_response) + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + # Exhaust failures (default MAX_AUTH_FAILURES=5) + for _ in range(5): + await oauth_validator.validate_oauth_token("bad-token", "10.0.0.1", "Agent") + + # Next attempt should be rate-limited + is_valid, error, user_data = await oauth_validator.validate_oauth_token( + "bad-token", "10.0.0.1", "Agent" + ) + + assert is_valid is False + assert error is not None + assert "too many" in error.lower() + + +# --------------------------------------------------------------------------- +# Singleton tests +# --------------------------------------------------------------------------- + + +def test_get_oauth_validator_singleton(mock_env_oauth): + """get_oauth_validator returns the same instance on repeated calls.""" + v1 = get_oauth_validator() + v2 = get_oauth_validator() + assert v1 is v2 + + +def test_reset_oauth_validator(mock_env_oauth): + """reset_oauth_validator creates a fresh instance after reset.""" + v1 = get_oauth_validator() + reset_oauth_validator() + v2 = get_oauth_validator() + assert v1 is not v2 + + +# --------------------------------------------------------------------------- +# ContextVar isolation tests +# --------------------------------------------------------------------------- + + +def test_context_var_token_isolation(): + """ContextVar values do not leak between coroutines.""" + results = {} + + async def task_a(): + set_gitea_user_token("token-for-a") + await asyncio.sleep(0) + results["a"] = get_gitea_user_token() + + async def task_b(): + # task_b never sets the token; should see None (default) + await asyncio.sleep(0) + results["b"] = get_gitea_user_token() + + async def run(): + await asyncio.gather(task_a(), task_b()) + + asyncio.run(run()) + + assert results["a"] == "token-for-a" + assert results["b"] is None # ContextVar isolation: task_b sees default + + +def test_context_var_login_set_and_get(): + """set_gitea_user_login / get_gitea_user_login work correctly.""" + set_gitea_user_login("alice") + assert get_gitea_user_login() == "alice" + + +# --------------------------------------------------------------------------- +# /oauth/token proxy endpoint tests +# --------------------------------------------------------------------------- + + +def test_oauth_token_endpoint_available_when_oauth_mode_false(monkeypatch): + """POST /oauth/token remains available regardless of OAUTH_MODE.""" + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("GITEA_TOKEN", "test-token-12345") + monkeypatch.setenv("ENVIRONMENT", "test") + monkeypatch.setenv("MCP_API_KEYS", "a" * 64) + monkeypatch.setenv("OAUTH_MODE", "false") + monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "false") + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"access_token": "token"} + + from aegis_gitea_mcp.server import app + + with patch("aegis_gitea_mcp.server.httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=mock_response) + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + with TestClient(app, raise_server_exceptions=False) as client: + response = client.post("/oauth/token", data={"code": "abc123"}) + + assert response.status_code == 200 + + +def test_oauth_token_endpoint_missing_code(oauth_client): + """POST /oauth/token without a code returns 400.""" + response = oauth_client.post("/oauth/token", data={}) + assert response.status_code == 400 + + +def test_oauth_token_endpoint_proxy_success(oauth_client): + """POST /oauth/token proxies successfully to Gitea and returns access_token.""" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "access_token": "gitea-access-token-xyz", + "token_type": "bearer", + } + + with patch("aegis_gitea_mcp.server.httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=mock_response) + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + response = oauth_client.post( + "/oauth/token", + data={"code": "auth-code-123", "redirect_uri": "https://chat.openai.com/callback"}, + ) + + assert response.status_code == 200 + body = response.json() + assert body["access_token"] == "gitea-access-token-xyz" + + +def test_oauth_token_endpoint_gitea_error(oauth_client): + """POST /oauth/token propagates Gitea error status.""" + mock_response = MagicMock() + mock_response.status_code = 400 + mock_response.json.return_value = {"error": "invalid_grant"} + + with patch("aegis_gitea_mcp.server.httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=mock_response) + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + response = oauth_client.post("/oauth/token", data={"code": "bad-code"}) + + assert response.status_code == 400 + + +# --------------------------------------------------------------------------- +# Config validation tests +# --------------------------------------------------------------------------- + + +def test_config_oauth_mode_requires_client_id(monkeypatch): + """OAUTH_MODE=true without client_id raises ValueError.""" + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("OAUTH_MODE", "true") + monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "") + monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "some-secret") + + from aegis_gitea_mcp.config import Settings + + with pytest.raises(Exception, match="GITEA_OAUTH_CLIENT_ID"): + Settings() # type: ignore[call-arg] + + +def test_config_oauth_mode_requires_client_secret(monkeypatch): + """OAUTH_MODE=true without client_secret raises ValueError.""" + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("OAUTH_MODE", "true") + monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "some-id") + monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "") + + from aegis_gitea_mcp.config import Settings + + with pytest.raises(Exception, match="GITEA_OAUTH_CLIENT_SECRET"): + Settings() # type: ignore[call-arg] + + +def test_config_standard_mode_requires_gitea_token(monkeypatch): + """Standard mode without GITEA_TOKEN raises ValueError.""" + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("OAUTH_MODE", "false") + monkeypatch.setenv("GITEA_TOKEN", "") + monkeypatch.setenv("MCP_API_KEYS", "a" * 64) + + from aegis_gitea_mcp.config import Settings + + with pytest.raises(Exception, match="GITEA_TOKEN"): + Settings() # type: ignore[call-arg] + + +# --------------------------------------------------------------------------- +# Server middleware: OAuth mode authentication +# --------------------------------------------------------------------------- + + +def test_mcp_tool_call_requires_valid_gitea_token(oauth_client): + """POST /mcp/tool/call with an invalid Gitea token returns 401.""" + mock_response = MagicMock() + mock_response.status_code = 401 + + with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=mock_response) + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + response = oauth_client.post( + "/mcp/tool/call", + json={"tool": "list_repositories", "arguments": {}}, + headers={"Authorization": "Bearer invalid-token"}, + ) + + assert response.status_code == 401 + + +def test_mcp_tool_call_no_token_returns_401(oauth_client): + """POST /mcp/tool/call without Authorization header returns 401.""" + response = oauth_client.post( + "/mcp/tool/call", + json={"tool": "list_repositories", "arguments": {}}, + ) + assert response.status_code == 401 diff --git a/tests/test_oauth_oidc.py b/tests/test_oauth_oidc.py new file mode 100644 index 0000000..b94e5c0 --- /dev/null +++ b/tests/test_oauth_oidc.py @@ -0,0 +1,150 @@ +"""OIDC/JWKS-focused OAuth validator tests.""" + +from __future__ import annotations + +import json +import time +from unittest.mock import AsyncMock, MagicMock, patch + +import jwt +import pytest +from cryptography.hazmat.primitives.asymmetric import rsa +from jwt.algorithms import RSAAlgorithm + +from aegis_gitea_mcp.config import reset_settings +from aegis_gitea_mcp.oauth import GiteaOAuthValidator, reset_oauth_validator + + +@pytest.fixture(autouse=True) +def reset_state(monkeypatch: pytest.MonkeyPatch) -> None: + """Reset state and configure OAuth validation environment.""" + reset_settings() + reset_oauth_validator() + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("ENVIRONMENT", "test") + monkeypatch.setenv("OAUTH_MODE", "true") + monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id") + monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret") + monkeypatch.setenv("OAUTH_CACHE_TTL_SECONDS", "600") + yield + reset_settings() + reset_oauth_validator() + + +def _build_jwt_fixture() -> tuple[str, dict[str, object]]: + """Generate RS256 access token and matching JWKS payload.""" + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + public_key = private_key.public_key() + jwk = json.loads(RSAAlgorithm.to_jwk(public_key)) + jwk["kid"] = "kid-123" + + now = int(time.time()) + token = jwt.encode( + { + "sub": "user-1", + "preferred_username": "alice", + "scope": "read:repository write:repository", + "aud": "test-client-id", + "iss": "https://gitea.example.com", + "iat": now, + "exp": now + 3600, + }, + private_key, + algorithm="RS256", + headers={"kid": "kid-123"}, + ) + return token, {"keys": [jwk]} + + +@pytest.mark.asyncio +async def test_validate_oauth_token_with_oidc_jwt_and_cache() -> None: + """JWT token validation uses discovery + JWKS and caches both documents.""" + token, jwks = _build_jwt_fixture() + validator = GiteaOAuthValidator() + + discovery_response = MagicMock() + discovery_response.status_code = 200 + discovery_response.json.return_value = { + "issuer": "https://gitea.example.com", + "jwks_uri": "https://gitea.example.com/login/oauth/keys", + } + + jwks_response = MagicMock() + jwks_response.status_code = 200 + jwks_response.json.return_value = jwks + + with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.get = AsyncMock(side_effect=[discovery_response, jwks_response]) + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + valid1, error1, principal1 = await validator.validate_oauth_token( + token, "127.0.0.1", "TestAgent" + ) + valid2, error2, principal2 = await validator.validate_oauth_token( + token, "127.0.0.1", "TestAgent" + ) + + assert valid1 is True + assert error1 is None + assert principal1 is not None + assert principal1["login"] == "alice" + assert "write:repository" in principal1["scopes"] + + assert valid2 is True + assert error2 is None + assert principal2 is not None + # Discovery + JWKS fetched once each because of cache. + assert mock_client.get.await_count == 2 + + +@pytest.mark.asyncio +async def test_invalid_jwt_falls_back_and_fails_userinfo() -> None: + """Invalid JWT returns auth failure when userinfo fallback rejects token.""" + validator = GiteaOAuthValidator() + + # JWT-shaped token with invalid signature/header. + bad_token = "abc.def.ghi" + + discovery_response = MagicMock() + discovery_response.status_code = 200 + discovery_response.json.return_value = { + "issuer": "https://gitea.example.com", + "jwks_uri": "https://gitea.example.com/login/oauth/keys", + } + + jwks_response = MagicMock() + jwks_response.status_code = 200 + jwks_response.json.return_value = { + "keys": [{"kid": "missing", "kty": "RSA", "n": "x", "e": "AQAB"}] + } + + userinfo_denied = MagicMock() + userinfo_denied.status_code = 401 + + with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.get = AsyncMock( + side_effect=[discovery_response, jwks_response, userinfo_denied] + ) + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + is_valid, error, principal = await validator.validate_oauth_token( + bad_token, + "127.0.0.1", + "TestAgent", + ) + + assert is_valid is False + assert principal is None + assert error is not None + + +def test_extract_bearer_token_strict_parsing() -> None: + """Bearer extraction accepts only strict `Bearer ` format.""" + assert GiteaOAuthValidator.extract_bearer_token("Bearer abc123") == "abc123" + assert GiteaOAuthValidator.extract_bearer_token("bearer abc123") is None + assert GiteaOAuthValidator.extract_bearer_token("Bearer ") is None + assert GiteaOAuthValidator.extract_bearer_token("Basic abc") is None diff --git a/tests/test_repository_tools.py b/tests/test_repository_tools.py new file mode 100644 index 0000000..3c194c5 --- /dev/null +++ b/tests/test_repository_tools.py @@ -0,0 +1,128 @@ +"""Tests for repository-focused tool handlers.""" + +from __future__ import annotations + +import pytest + +from aegis_gitea_mcp.config import reset_settings +from aegis_gitea_mcp.gitea_client import GiteaError +from aegis_gitea_mcp.tools.repository import ( + get_file_contents_tool, + get_file_tree_tool, + get_repository_info_tool, + list_repositories_tool, +) + + +@pytest.fixture(autouse=True) +def repository_tool_env(monkeypatch: pytest.MonkeyPatch) -> None: + """Provide minimal settings needed by response limit helpers.""" + reset_settings() + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("GITEA_TOKEN", "legacy-token") + monkeypatch.setenv("MCP_API_KEYS", "a" * 64) + monkeypatch.setenv("ENVIRONMENT", "test") + yield + reset_settings() + + +class RepoStub: + """Stub Gitea client for repository tools.""" + + async def list_repositories(self): + return [{"name": "demo", "owner": {"login": "acme"}, "full_name": "acme/demo"}] + + async def get_repository(self, owner, repo): + return {"name": repo, "owner": {"login": owner}, "full_name": f"{owner}/{repo}"} + + async def get_tree(self, owner, repo, ref, recursive): + return {"tree": [{"path": "README.md", "type": "blob", "size": 11, "sha": "abc"}]} + + async def get_file_contents(self, owner, repo, filepath, ref): + return { + "content": "SGVsbG8gV29ybGQ=", + "encoding": "base64", + "size": 11, + "sha": "abc", + "html_url": f"https://example/{owner}/{repo}/{filepath}", + } + + +class RepoErrorStub(RepoStub): + """Stub that raises backend errors.""" + + async def list_repositories(self): + raise GiteaError("backend down") + + +@pytest.mark.asyncio +async def test_list_repositories_tool_success() -> None: + """Repository listing tool normalizes output shape.""" + result = await list_repositories_tool(RepoStub(), {}) + assert result["count"] == 1 + assert result["repositories"][0]["owner"] == "acme" + + +@pytest.mark.asyncio +async def test_list_repositories_tool_failure_mode() -> None: + """Repository listing tool wraps backend errors.""" + with pytest.raises(RuntimeError, match="Failed to list repositories"): + await list_repositories_tool(RepoErrorStub(), {}) + + +@pytest.mark.asyncio +async def test_get_repository_info_tool_success() -> None: + """Repository info tool returns normalized metadata.""" + result = await get_repository_info_tool(RepoStub(), {"owner": "acme", "repo": "demo"}) + assert result["full_name"] == "acme/demo" + + +@pytest.mark.asyncio +async def test_get_file_tree_tool_success() -> None: + """File tree tool returns bounded tree entries.""" + result = await get_file_tree_tool( + RepoStub(), + {"owner": "acme", "repo": "demo", "ref": "main", "recursive": False}, + ) + assert result["count"] == 1 + assert result["tree"][0]["path"] == "README.md" + + +@pytest.mark.asyncio +async def test_get_file_contents_tool_decodes_base64() -> None: + """File contents tool decodes UTF-8 base64 payloads.""" + result = await get_file_contents_tool( + RepoStub(), + {"owner": "acme", "repo": "demo", "filepath": "README.md", "ref": "main"}, + ) + assert result["content"] == "Hello World" + + +@pytest.mark.asyncio +async def test_get_file_contents_tool_handles_invalid_base64() -> None: + """Invalid base64 payloads are returned safely without crashing.""" + + class InvalidBase64Stub(RepoStub): + async def get_file_contents(self, owner, repo, filepath, ref): + return {"content": "%%%not-base64%%%", "encoding": "base64", "size": 4, "sha": "abc"} + + result = await get_file_contents_tool( + InvalidBase64Stub(), + {"owner": "acme", "repo": "demo", "filepath": "README.md", "ref": "main"}, + ) + assert result["content"] == "%%%not-base64%%%" + + +@pytest.mark.asyncio +async def test_get_file_contents_tool_failure_mode() -> None: + """File contents tool wraps backend failures.""" + + class ErrorFileStub(RepoStub): + async def get_file_contents(self, owner, repo, filepath, ref): + raise GiteaError("boom") + + with pytest.raises(RuntimeError, match="Failed to get file contents"): + await get_file_contents_tool( + ErrorFileStub(), + {"owner": "acme", "repo": "demo", "filepath": "README.md", "ref": "main"}, + ) diff --git a/tests/test_server.py b/tests/test_server.py index cfa763f..56a3ac9 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -1,158 +1,144 @@ """Tests for MCP server endpoints.""" +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock, patch + +import httpx import pytest from fastapi.testclient import TestClient -from aegis_gitea_mcp.auth import reset_validator from aegis_gitea_mcp.config import reset_settings -from aegis_gitea_mcp.gitea_client import GiteaAuthenticationError, GiteaAuthorizationError +from aegis_gitea_mcp.oauth import reset_oauth_validator @pytest.fixture(autouse=True) -def reset_state(): +def reset_state() -> None: """Reset global state between tests.""" reset_settings() - reset_validator() + reset_oauth_validator() yield reset_settings() - reset_validator() + reset_oauth_validator() @pytest.fixture -def mock_env(monkeypatch): - """Set up test environment.""" +def oauth_env(monkeypatch: pytest.MonkeyPatch) -> None: + """Set OAuth-first environment for server tests.""" monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") - monkeypatch.setenv("GITEA_TOKEN", "test-gitea-token-12345") + monkeypatch.setenv("OAUTH_MODE", "true") + monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id") + monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret") monkeypatch.setenv("ENVIRONMENT", "test") - monkeypatch.setenv("AUTH_ENABLED", "true") - monkeypatch.setenv("MCP_API_KEYS", "a" * 64) monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "false") @pytest.fixture -def mock_env_auth_disabled(monkeypatch): - """Set up test environment with auth disabled.""" - monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") - monkeypatch.setenv("GITEA_TOKEN", "test-gitea-token-12345") - monkeypatch.setenv("ENVIRONMENT", "test") - monkeypatch.setenv("AUTH_ENABLED", "false") - monkeypatch.setenv("MCP_API_KEYS", "") - monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "false") +def mock_oauth_validation(monkeypatch: pytest.MonkeyPatch) -> None: + """Mock OAuth validator outcomes by token value.""" + + async def _validate(_self, token: str | None, _ip: str, _ua: str): + if token == "valid-read": + return True, None, {"login": "alice", "scopes": ["read:repository"]} + if token == "valid-write": + return ( + True, + None, + { + "login": "alice", + "scopes": ["read:repository", "write:repository"], + }, + ) + return False, "Invalid or expired OAuth token.", None + + monkeypatch.setattr( + "aegis_gitea_mcp.oauth.GiteaOAuthValidator.validate_oauth_token", + _validate, + ) @pytest.fixture -def client(mock_env): - """Create test client.""" - # Import after setting env vars +def client(oauth_env: None, mock_oauth_validation: None) -> TestClient: + """Create FastAPI test client.""" from aegis_gitea_mcp.server import app return TestClient(app) -@pytest.fixture -def client_no_auth(mock_env_auth_disabled): - """Create test client with auth disabled.""" - from aegis_gitea_mcp.server import app - - return TestClient(app) - - -def test_root_endpoint(client): - """Test root endpoint returns server info.""" +def test_root_endpoint(client: TestClient) -> None: + """Root endpoint returns server metadata.""" response = client.get("/") - assert response.status_code == 200 data = response.json() assert data["name"] == "AegisGitea MCP Server" - assert "version" in data assert data["status"] == "running" -def test_health_endpoint(client): - """Test health check endpoint.""" +def test_health_endpoint(client: TestClient) -> None: + """Health endpoint does not require auth.""" response = client.get("/health") + assert response.status_code == 200 + assert response.json()["status"] == "healthy" + + +def test_oauth_protected_resource_metadata(client: TestClient) -> None: + """OAuth protected-resource metadata contains required OpenAI-compatible fields.""" + response = client.get("/.well-known/oauth-protected-resource") assert response.status_code == 200 data = response.json() - assert data["status"] == "healthy" + assert data["resource"] == "https://gitea.example.com" + assert data["authorization_servers"] == ["https://gitea.example.com"] + assert data["bearer_methods_supported"] == ["header"] + assert data["scopes_supported"] == ["read:repository", "write:repository"] + assert "resource_documentation" in data -def test_metrics_endpoint(client): - """Metrics endpoint should be available for observability.""" - response = client.get("/metrics") +def test_oauth_authorization_server_metadata(client: TestClient) -> None: + """Auth server metadata includes expected OAuth endpoints and scopes.""" + response = client.get("/.well-known/oauth-authorization-server") assert response.status_code == 200 - assert "aegis_http_requests_total" in response.text + payload = response.json() + assert payload["authorization_endpoint"].endswith("/login/oauth/authorize") + assert payload["token_endpoint"].endswith("/oauth/token") + assert payload["scopes_supported"] == ["read:repository", "write:repository"] -def test_health_endpoint_no_auth_required(client): - """Test that health check doesn't require authentication.""" - response = client.get("/health") - - # Should work without Authorization header - assert response.status_code == 200 - - -def test_list_tools_without_auth(client): - """Test that /mcp/tools is public (Mixed mode for ChatGPT).""" +def test_list_tools_without_auth(client: TestClient) -> None: + """Tool listing remains discoverable without auth.""" response = client.get("/mcp/tools") - - # Tool listing is public to support ChatGPT discovery assert response.status_code == 200 - data = response.json() - assert "tools" in data + assert "tools" in response.json() -def test_list_tools_with_invalid_key(client): - """Test /mcp/tools works even with invalid key (public endpoint).""" - response = client.get( - "/mcp/tools", - headers={"Authorization": "Bearer invalid-key-12345678901234567890123456789012"}, +def test_call_tool_without_auth_returns_challenge(client: TestClient) -> None: + """Tool calls without bearer token return 401 + WWW-Authenticate challenge.""" + response = client.post("/mcp/tool/call", json={"tool": "list_repositories", "arguments": {}}) + + assert response.status_code == 401 + assert "WWW-Authenticate" in response.headers + challenge = response.headers["WWW-Authenticate"] + assert 'resource_metadata="http://testserver/.well-known/oauth-protected-resource"' in challenge + assert 'scope="read:repository"' in challenge + + +def test_call_tool_invalid_token_returns_challenge(client: TestClient) -> None: + """Invalid bearer token returns 401 + WWW-Authenticate challenge.""" + response = client.post( + "/mcp/tool/call", + headers={"Authorization": "Bearer invalid-token"}, + json={"tool": "list_repositories", "arguments": {}}, ) - # Tool listing is public, so even invalid keys can list tools - assert response.status_code == 200 + assert response.status_code == 401 + assert "WWW-Authenticate" in response.headers -def test_list_tools_with_valid_key(client, mock_env): - """Test /mcp/tools with valid API key.""" - response = client.get("/mcp/tools", headers={"Authorization": f"Bearer {'a' * 64}"}) - - assert response.status_code == 200 - data = response.json() - assert "tools" in data - assert len(data["tools"]) > 0 - - # Check tool structure - tool = data["tools"][0] - assert "name" in tool - assert "description" in tool - assert "inputSchema" in tool - - -def test_list_tools_with_query_param(client): - """Test /mcp/tools with API key in query parameter.""" - response = client.get(f"/mcp/tools?api_key={'a' * 64}") - - assert response.status_code == 200 - data = response.json() - assert "tools" in data - assert len(data["tools"]) > 0 - - -def test_list_tools_no_auth_when_disabled(client_no_auth): - """Test that /mcp/tools works without auth when disabled.""" - response = client_no_auth.get("/mcp/tools") - - # Should work without Authorization header when auth is disabled - assert response.status_code == 200 - data = response.json() - assert "tools" in data - - -def test_sse_tools_list_returns_camel_case_schema(client): +def test_sse_tools_list_returns_camel_case_schema(client: TestClient) -> None: """SSE tools/list returns MCP-compatible camelCase inputSchema.""" response = client.post( - f"/mcp/sse?api_key={'a' * 64}", + "/mcp/sse", + headers={"Authorization": "Bearer valid-read"}, json={"jsonrpc": "2.0", "id": "1", "method": "tools/list"}, ) @@ -160,48 +146,95 @@ def test_sse_tools_list_returns_camel_case_schema(client): data = response.json() assert "result" in data assert "tools" in data["result"] - tool = data["result"]["tools"][0] - assert "inputSchema" in tool - assert "type" in tool["inputSchema"] + assert "inputSchema" in data["result"]["tools"][0] -def test_call_tool_without_auth(client): - """Test that /mcp/tool/call requires authentication.""" - response = client.post("/mcp/tool/call", json={"tool": "list_repositories", "arguments": {}}) - - assert response.status_code == 401 - - -def test_call_tool_with_invalid_key(client): - """Test /mcp/tool/call with invalid API key.""" +def test_sse_initialize_message(client: TestClient) -> None: + """SSE initialize message returns protocol and server metadata.""" response = client.post( - "/mcp/tool/call", - headers={"Authorization": "Bearer invalid-key-12345678901234567890123456789012"}, - json={"tool": "list_repositories", "arguments": {}}, + "/mcp/sse", + headers={"Authorization": "Bearer valid-read"}, + json={"jsonrpc": "2.0", "id": "init-1", "method": "initialize"}, ) - assert response.status_code == 401 + assert response.status_code == 200 + payload = response.json() + assert payload["result"]["protocolVersion"] == "2024-11-05" + assert payload["result"]["serverInfo"]["name"] == "AegisGitea MCP" -def test_call_nonexistent_tool(client): - """Test calling a tool that doesn't exist.""" +def test_sse_tools_call_success_response( + client: TestClient, monkeypatch: pytest.MonkeyPatch +) -> None: + """SSE tools/call wraps successful tool output in text content.""" + + async def _fake_execute(tool_name: str, arguments: dict, correlation_id: str) -> dict: + assert tool_name == "list_repositories" + assert isinstance(arguments, dict) + assert correlation_id + return {"ok": True} + + monkeypatch.setattr("aegis_gitea_mcp.server._execute_tool_call", _fake_execute) + + response = client.post( + "/mcp/sse", + headers={"Authorization": "Bearer valid-read"}, + json={ + "jsonrpc": "2.0", + "id": "call-1", + "method": "tools/call", + "params": {"name": "list_repositories", "arguments": {}}, + }, + ) + + assert response.status_code == 200 + assert '"ok": true' in response.json()["result"]["content"][0]["text"].lower() + + +def test_sse_tools_call_http_exception(client: TestClient, monkeypatch: pytest.MonkeyPatch) -> None: + """SSE tools/call maps HTTPException to JSON-RPC error envelope.""" + + async def _fake_execute(_tool_name: str, _arguments: dict, _correlation_id: str) -> dict: + from fastapi import HTTPException + + raise HTTPException(status_code=403, detail="Insufficient scope") + + monkeypatch.setattr("aegis_gitea_mcp.server._execute_tool_call", _fake_execute) + + response = client.post( + "/mcp/sse", + headers={"Authorization": "Bearer valid-read"}, + json={ + "jsonrpc": "2.0", + "id": "call-2", + "method": "tools/call", + "params": {"name": "create_issue", "arguments": {}}, + }, + ) + + assert response.status_code == 200 + body = response.json() + assert body["error"]["code"] == -32000 + assert "insufficient scope" in body["error"]["message"].lower() + + +def test_call_nonexistent_tool(client: TestClient) -> None: + """Unknown tools return 404 after successful auth.""" response = client.post( "/mcp/tool/call", - headers={"Authorization": f"Bearer {'a' * 64}"}, + headers={"Authorization": "Bearer valid-read"}, json={"tool": "nonexistent_tool", "arguments": {}}, ) - # Tool not found returns 404 (auth passes but tool missing) assert response.status_code == 404 - data = response.json() - assert "not found" in data["detail"].lower() + assert "not found" in response.json()["detail"].lower() -def test_write_tool_denied_by_default_policy(client): - """Write tools must be denied when write mode is disabled.""" +def test_write_scope_enforced_before_policy(client: TestClient) -> None: + """Write tools require write:repository scope.""" response = client.post( "/mcp/tool/call", - headers={"Authorization": f"Bearer {'a' * 64}"}, + headers={"Authorization": "Bearer valid-read"}, json={ "tool": "create_issue", "arguments": {"owner": "acme", "repo": "demo", "title": "test"}, @@ -209,96 +242,74 @@ def test_write_tool_denied_by_default_policy(client): ) assert response.status_code == 403 - data = response.json() - assert "policy denied" in data["detail"].lower() + assert "required scope: write:repository" in response.json()["detail"].lower() -def test_sse_endpoint_without_auth(client): - """Test that SSE endpoint requires authentication.""" - response = client.get("/mcp/sse") - - assert response.status_code == 401 - - -def test_auth_header_formats(client): - """Test various Authorization header formats on protected endpoint.""" - # Test with /mcp/tool/call since /mcp/tools is now public - tool_data = {"tool": "list_repositories", "arguments": {}} - - # Missing "Bearer" prefix - response = client.post("/mcp/tool/call", headers={"Authorization": "a" * 64}, json=tool_data) - assert response.status_code == 401 - - # Wrong case +def test_write_tool_denied_by_default_policy(client: TestClient) -> None: + """Even with write scope, write mode stays denied by default policy.""" response = client.post( - "/mcp/tool/call", headers={"Authorization": "bearer " + "a" * 64}, json=tool_data + "/mcp/tool/call", + headers={"Authorization": "Bearer valid-write"}, + json={ + "tool": "create_issue", + "arguments": {"owner": "acme", "repo": "demo", "title": "test"}, + }, ) - assert response.status_code == 401 - # Extra spaces - response = client.post( - "/mcp/tool/call", headers={"Authorization": f"Bearer {'a' * 64}"}, json=tool_data - ) - assert response.status_code == 401 + assert response.status_code == 403 + assert "write mode is disabled" in response.json()["detail"].lower() -def test_rate_limiting(client): - """Test rate limiting after multiple failed auth attempts.""" - tool_data = {"tool": "list_repositories", "arguments": {}} +@pytest.mark.asyncio +async def test_startup_event_fails_when_discovery_unreachable( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Startup validation fails with clear guidance if OIDC discovery is unreachable.""" + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("ENVIRONMENT", "production") + monkeypatch.setenv("OAUTH_MODE", "true") + monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id") + monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret") + monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "true") - # Make 6 failed attempts on protected endpoint - for _ in range(6): - response = client.post( - "/mcp/tool/call", headers={"Authorization": "Bearer " + "x" * 64}, json=tool_data + from aegis_gitea_mcp import server + + with patch("aegis_gitea_mcp.server.httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.get = AsyncMock( + side_effect=httpx.RequestError("connect failed", request=MagicMock()) ) + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) - # Last response should mention rate limiting - data = response.json() - assert "Too many failed" in data["message"] + with pytest.raises( + RuntimeError, + match="unable to reach Gitea OIDC discovery endpoint", + ): + await server.startup_event() @pytest.mark.asyncio -async def test_startup_event_fails_with_authentication_guidance(monkeypatch): - """Startup validation should fail with explicit auth guidance on 401.""" +async def test_startup_event_succeeds_when_discovery_ready( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Startup validation succeeds when OIDC discovery returns HTTP 200.""" monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") - monkeypatch.setenv("GITEA_TOKEN", "test-gitea-token-12345") monkeypatch.setenv("ENVIRONMENT", "production") - monkeypatch.setenv("AUTH_ENABLED", "true") - monkeypatch.setenv("MCP_API_KEYS", "a" * 64) + monkeypatch.setenv("OAUTH_MODE", "true") + monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id") + monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret") monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "true") from aegis_gitea_mcp import server - async def raise_auth_error(*_args, **_kwargs): - raise GiteaAuthenticationError("Authentication failed - check bot token") + mock_response = MagicMock() + mock_response.status_code = 200 - monkeypatch.setattr(server.GiteaClient, "get_current_user", raise_auth_error) + with patch("aegis_gitea_mcp.server.httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=mock_response) + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) - with pytest.raises( - RuntimeError, match=r"Startup validation failed: Gitea authentication was rejected" - ): - await server.startup_event() - - -@pytest.mark.asyncio -async def test_startup_event_fails_with_authorization_guidance(monkeypatch): - """Startup validation should fail with explicit permission guidance on 403.""" - monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") - monkeypatch.setenv("GITEA_TOKEN", "test-gitea-token-12345") - monkeypatch.setenv("ENVIRONMENT", "production") - monkeypatch.setenv("AUTH_ENABLED", "true") - monkeypatch.setenv("MCP_API_KEYS", "a" * 64) - monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "true") - - from aegis_gitea_mcp import server - - async def raise_authorization_error(*_args, **_kwargs): - raise GiteaAuthorizationError("Bot user lacks permission for this operation") - - monkeypatch.setattr(server.GiteaClient, "get_current_user", raise_authorization_error) - - with pytest.raises( - RuntimeError, - match=r"Startup validation failed: Gitea token lacks permission for /api/v1/user", - ): await server.startup_event()