Compare commits
5 Commits
227122263b
...
aefb243a05
| Author | SHA1 | Date | |
|---|---|---|---|
|
aefb243a05
|
|||
|
7f7aaab5a6
|
|||
|
8c84d76bd5
|
|||
|
8e41fd12af
|
|||
|
2844c42ec8
|
@@ -63,6 +63,17 @@ WRITE_MODE=false
|
|||||||
WRITE_REPOSITORY_WHITELIST=
|
WRITE_REPOSITORY_WHITELIST=
|
||||||
WRITE_ALLOW_ALL_TOKEN_REPOS=false
|
WRITE_ALLOW_ALL_TOKEN_REPOS=false
|
||||||
|
|
||||||
|
# Raw API dispatch (gitea_request escape hatch). See docs/raw-api.md.
|
||||||
|
# gitea_request can call any Gitea REST endpoint (method + path). It is still
|
||||||
|
# subject to policy.yaml, WRITE_MODE + the write whitelist, and a built-in
|
||||||
|
# admin/credential denylist. Set RAW_API_ENABLED=false to remove the tool's
|
||||||
|
# ability to dispatch entirely.
|
||||||
|
RAW_API_ENABLED=true
|
||||||
|
# Allow gitea_request to reach admin/credential surfaces (/admin, *tokens*,
|
||||||
|
# *secrets*, *hooks*, *keys*, applications/oauth2, runner registration tokens).
|
||||||
|
# Leave false unless you fully understand the exposure.
|
||||||
|
RAW_API_ALLOW_SENSITIVE=false
|
||||||
|
|
||||||
# Automation mode (disabled by default)
|
# Automation mode (disabled by default)
|
||||||
AUTOMATION_ENABLED=false
|
AUTOMATION_ENABLED=false
|
||||||
AUTOMATION_SCHEDULER_ENABLED=false
|
AUTOMATION_SCHEDULER_ENABLED=false
|
||||||
|
|||||||
+12
-2
@@ -90,8 +90,18 @@ Scope requirements:
|
|||||||
- `create_milestone` (`owner`, `repo`, `title`, optional `description`, `due_on`)
|
- `create_milestone` (`owner`, `repo`, `title`, optional `description`, `due_on`)
|
||||||
- `edit_issue_comment` (`owner`, `repo`, `comment_id`, `body`)
|
- `edit_issue_comment` (`owner`, `repo`, `comment_id`, `body`)
|
||||||
|
|
||||||
Not supported by design: merge, branch/label/release deletion, force push, repo/admin
|
Not supported by the dedicated tools by design: merge, branch/label/release deletion,
|
||||||
management.
|
force push, repo/admin management. Endpoints not covered above are reachable through the
|
||||||
|
generic `gitea_request` escape hatch (subject to policy, write-mode, and a sensitive-path
|
||||||
|
denylist) — see [Raw API Dispatch](raw-api.md).
|
||||||
|
|
||||||
|
## Raw API Dispatch
|
||||||
|
|
||||||
|
- `gitea_request` (`method`, `path`, optional `query`, `body`)
|
||||||
|
- Calls an arbitrary Gitea REST endpoint. `GET`/`HEAD` are reads; other methods are
|
||||||
|
writes and require write-mode plus a whitelisted repository. Admin/credential
|
||||||
|
endpoints are blocked unless `RAW_API_ALLOW_SENSITIVE=true`. See
|
||||||
|
[Raw API Dispatch](raw-api.md) for the two-layer policy model and full details.
|
||||||
|
|
||||||
Note: `create_issue`, `add_labels`, and `remove_labels` accept label **names**; the
|
Note: `create_issue`, `add_labels`, and `remove_labels` accept label **names**; the
|
||||||
server resolves them to Gitea label ids and returns a clear error for unknown labels.
|
server resolves them to Gitea label ids and returns a clear error for unknown labels.
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ AegisGitea MCP acts as a secure bridge between AI assistants (such as Claude, Cl
|
|||||||
| [Getting Started](getting-started.md) | Installation and first-time setup |
|
| [Getting Started](getting-started.md) | Installation and first-time setup |
|
||||||
| [Configuration](configuration.md) | All environment variables and settings |
|
| [Configuration](configuration.md) | All environment variables and settings |
|
||||||
| [API Reference](api-reference.md) | HTTP endpoints and MCP tools |
|
| [API Reference](api-reference.md) | HTTP endpoints and MCP tools |
|
||||||
|
| [Raw API Dispatch](raw-api.md) | The generic `gitea_request` escape-hatch tool |
|
||||||
| [Architecture](architecture.md) | System design and data flow |
|
| [Architecture](architecture.md) | System design and data flow |
|
||||||
| [Security](security.md) | Authentication, rate limiting, and audit logging |
|
| [Security](security.md) | Authentication, rate limiting, and audit logging |
|
||||||
| [Deployment](deployment.md) | Docker and production deployment |
|
| [Deployment](deployment.md) | Docker and production deployment |
|
||||||
|
|||||||
+119
@@ -0,0 +1,119 @@
|
|||||||
|
# Raw API Dispatch (`gitea_request`)
|
||||||
|
|
||||||
|
`gitea_request` is a generic escape hatch that can call **any** Gitea REST
|
||||||
|
endpoint by method and path. It exists for the long tail of the Gitea API that
|
||||||
|
the curated, typed tools do not cover (merging PRs, reviews, writing files,
|
||||||
|
webhooks, branch/tag protections, collaborators, Actions/CI, packages,
|
||||||
|
notifications, and so on).
|
||||||
|
|
||||||
|
> Prefer the dedicated tools whenever one exists. Use `gitea_request` only for
|
||||||
|
> endpoints they do not cover. It is subject to policy, write-mode, and the
|
||||||
|
> sensitive-path denylist described below.
|
||||||
|
|
||||||
|
## Arguments
|
||||||
|
|
||||||
|
| Field | Type | Notes |
|
||||||
|
|-------|------|-------|
|
||||||
|
| `method` | enum | `GET`, `HEAD`, `POST`, `PUT`, `PATCH`, `DELETE` (case-insensitive). Any other method is rejected before any network call. |
|
||||||
|
| `path` | string | Gitea REST path. The `/api/v1` prefix is optional. A full URL may be supplied — the host and query string are stripped. |
|
||||||
|
| `query` | object | Optional query-string parameters. |
|
||||||
|
| `body` | object | Optional JSON request body. **Never logged.** |
|
||||||
|
|
||||||
|
The response is returned in a stable envelope:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"method": "GET",
|
||||||
|
"path": "/api/v1/repos/acme/app/pulls/1",
|
||||||
|
"write": false,
|
||||||
|
"repository": "acme/app",
|
||||||
|
"data": { "...": "..." }
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
List responses add `count` and `omitted`; oversized objects are returned as a
|
||||||
|
truncated JSON string with `"truncated": true`. All responses are bounded by
|
||||||
|
`MAX_TOOL_RESPONSE_ITEMS` / `MAX_TOOL_RESPONSE_CHARS`.
|
||||||
|
|
||||||
|
## Two-layer authorization
|
||||||
|
|
||||||
|
A single tool surface would normally collapse the granularity of `policy.yaml`.
|
||||||
|
To preserve it, every call is authorized twice:
|
||||||
|
|
||||||
|
1. **Central gate (`server.py`).** The registered `gitea_request` tool name is
|
||||||
|
allowed/denied like any other tool. In service-PAT mode the central gate also
|
||||||
|
parses the target repository from the path and verifies that the signed-in
|
||||||
|
user has permission on that repository before the service PAT is used.
|
||||||
|
2. **Handler gate (`raw_tools.py`).** The handler derives a coarse **virtual
|
||||||
|
tool name** of the form `gitea_request:<METHOD>:<top-path-segment>` (for
|
||||||
|
example `gitea_request:GET:repos` or `gitea_request:DELETE:repos`) and runs
|
||||||
|
it back through the policy engine with the parsed repository, target path, and
|
||||||
|
a `is_write` flag (`true` for any method other than GET/HEAD). This reuses the
|
||||||
|
existing write-mode + write-whitelist enforcement and lets `policy.yaml` allow
|
||||||
|
or deny raw dispatch per method and per top-level path segment.
|
||||||
|
|
||||||
|
Because the policy engine matches tool names by **exact set membership** (only
|
||||||
|
`paths` use globbing), the virtual name is deliberately coarse and stable.
|
||||||
|
|
||||||
|
### Example: lock raw dispatch to reads
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
tools:
|
||||||
|
deny:
|
||||||
|
- gitea_request:POST:repos
|
||||||
|
- gitea_request:PUT:repos
|
||||||
|
- gitea_request:PATCH:repos
|
||||||
|
- gitea_request:DELETE:repos
|
||||||
|
```
|
||||||
|
|
||||||
|
## Sensitive-path denylist
|
||||||
|
|
||||||
|
Independently of `policy.yaml`, the handler blocks endpoints that touch an
|
||||||
|
admin or credential surface **for every method, including GET** (a GET on these
|
||||||
|
already leaks credentials or privileged configuration):
|
||||||
|
|
||||||
|
- `/admin`
|
||||||
|
- `*tokens*`
|
||||||
|
- `*secrets*`
|
||||||
|
- `*hooks*`
|
||||||
|
- `*keys*` (and `*gpg_keys*`)
|
||||||
|
- `applications/oauth2`
|
||||||
|
- `actions/runners/registration-token`
|
||||||
|
|
||||||
|
This denylist lives in the handler and **cannot be re-opened from
|
||||||
|
`policy.yaml`.** It is overridden only by setting `RAW_API_ALLOW_SENSITIVE=true`.
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
| Variable | Default | Notes |
|
||||||
|
|----------|---------|-------|
|
||||||
|
| `RAW_API_ENABLED` | `true` | Killswitch. When `false`, `gitea_request` refuses every dispatch with a `403`. |
|
||||||
|
| `RAW_API_ALLOW_SENSITIVE` | `false` | When `true`, the admin/credential denylist is bypassed. Leave `false` unless you fully understand the exposure. |
|
||||||
|
|
||||||
|
## Security warning
|
||||||
|
|
||||||
|
> With `WRITE_MODE=true`, the **write whitelist is the only brake** on
|
||||||
|
> `POST`/`PUT`/`PATCH`/`DELETE` across the *entire* Gitea API surface reachable
|
||||||
|
> by `gitea_request`. Any write method against a whitelisted repository will be
|
||||||
|
> attempted. Keep the whitelist tight, prefer denying the write virtual tool
|
||||||
|
> names in `policy.yaml`, and keep `RAW_API_ALLOW_SENSITIVE=false`.
|
||||||
|
|
||||||
|
## Behavioral notes and edge cases
|
||||||
|
|
||||||
|
- **Full URL supplied instead of a path:** only the path is used; the host and
|
||||||
|
query string are discarded (`query` carries query parameters).
|
||||||
|
- **Path traversal (`..`):** rejected during argument validation (`400`).
|
||||||
|
- **Unknown / non-HTTP method:** rejected during argument validation, before any
|
||||||
|
network call.
|
||||||
|
- **Cross-repo endpoints** such as `/repos/search` and `/repos/issues/search`
|
||||||
|
are intentionally *not* treated as repository-scoped, so `repository` is
|
||||||
|
`null` for them.
|
||||||
|
- **Non-repository writes** such as `POST /user/repos` or `POST /orgs` are denied
|
||||||
|
with *"write operation requires a repository target"*. This is the secure
|
||||||
|
default — the per-user permission model is repository-scoped, so there is no
|
||||||
|
repository against which to verify the write. This behavior is intentional and
|
||||||
|
is not worked around.
|
||||||
|
- **Service-PAT mode:** non-repository endpoints (for example `GET /user/orgs`)
|
||||||
|
are denied by the central gate because per-user permission can only be verified
|
||||||
|
against a repository target. Use the dedicated tools for those, or run in
|
||||||
|
OAuth-only mode.
|
||||||
+15
@@ -4,5 +4,20 @@ defaults:
|
|||||||
|
|
||||||
tools:
|
tools:
|
||||||
deny: []
|
deny: []
|
||||||
|
# The generic `gitea_request` tool authorizes each call under a coarse virtual
|
||||||
|
# tool name of the form `gitea_request:<METHOD>:<top-path-segment>`, e.g.
|
||||||
|
# `gitea_request:GET:repos` or `gitea_request:DELETE:repos`. To keep raw
|
||||||
|
# dispatch read-only while still allowing GETs, deny the write methods here:
|
||||||
|
#
|
||||||
|
# deny:
|
||||||
|
# - gitea_request:POST:repos
|
||||||
|
# - gitea_request:PUT:repos
|
||||||
|
# - gitea_request:PATCH:repos
|
||||||
|
# - gitea_request:DELETE:repos
|
||||||
|
#
|
||||||
|
# NOTE: The admin/credential denylist (/admin, *tokens*, *secrets*, *hooks*,
|
||||||
|
# *keys*, applications/oauth2, runner registration tokens) is enforced in the
|
||||||
|
# handler independently of this file and is NOT configured here. It can only be
|
||||||
|
# overridden by setting RAW_API_ALLOW_SENSITIVE=true.
|
||||||
|
|
||||||
repositories: {}
|
repositories: {}
|
||||||
|
|||||||
@@ -211,6 +211,19 @@ class Settings(BaseSettings):
|
|||||||
"Disabled by default."
|
"Disabled by default."
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
# Raw API dispatch (gitea_request escape hatch)
|
||||||
|
raw_api_enabled: bool = Field(
|
||||||
|
default=True,
|
||||||
|
description="Enable the generic gitea_request raw API dispatch tool",
|
||||||
|
)
|
||||||
|
raw_api_allow_sensitive: bool = Field(
|
||||||
|
default=False,
|
||||||
|
description=(
|
||||||
|
"Allow gitea_request to reach admin/credential endpoints "
|
||||||
|
"(/admin, *tokens*, *secrets*, *hooks*, *keys*, applications/oauth2, "
|
||||||
|
"runner registration tokens). Disabled by default."
|
||||||
|
),
|
||||||
|
)
|
||||||
automation_enabled: bool = Field(
|
automation_enabled: bool = Field(
|
||||||
default=False,
|
default=False,
|
||||||
description="Enable automation endpoints and workflows",
|
description="Enable automation endpoints and workflows",
|
||||||
|
|||||||
@@ -148,6 +148,49 @@ class GiteaClient:
|
|||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
async def raw_request(
|
||||||
|
self,
|
||||||
|
method: str,
|
||||||
|
endpoint: str,
|
||||||
|
*,
|
||||||
|
params: dict[str, Any] | None = None,
|
||||||
|
json_body: dict[str, Any] | None = None,
|
||||||
|
) -> Any:
|
||||||
|
"""Dispatch an arbitrary Gitea REST request for the ``gitea_request`` tool.
|
||||||
|
|
||||||
|
Only the method and normalized endpoint are audited; the request body is
|
||||||
|
never logged so secrets embedded in payloads are not persisted.
|
||||||
|
"""
|
||||||
|
correlation_id = self.audit.log_tool_invocation(
|
||||||
|
tool_name="gitea_request",
|
||||||
|
params={"method": method, "path": endpoint},
|
||||||
|
result_status="pending",
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
result = await self._request(
|
||||||
|
method,
|
||||||
|
endpoint,
|
||||||
|
correlation_id=correlation_id,
|
||||||
|
params=params,
|
||||||
|
json_body=json_body,
|
||||||
|
)
|
||||||
|
self.audit.log_tool_invocation(
|
||||||
|
tool_name="gitea_request",
|
||||||
|
correlation_id=correlation_id,
|
||||||
|
result_status="success",
|
||||||
|
params={"method": method, "path": endpoint},
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
except Exception as exc:
|
||||||
|
self.audit.log_tool_invocation(
|
||||||
|
tool_name="gitea_request",
|
||||||
|
correlation_id=correlation_id,
|
||||||
|
result_status="error",
|
||||||
|
params={"method": method, "path": endpoint},
|
||||||
|
error=str(exc),
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
async def list_repositories(self) -> list[dict[str, Any]]:
|
async def list_repositories(self) -> list[dict[str, Any]]:
|
||||||
"""List repositories visible to the authenticated user."""
|
"""List repositories visible to the authenticated user."""
|
||||||
correlation_id = self.audit.log_tool_invocation(
|
correlation_id = self.audit.log_tool_invocation(
|
||||||
|
|||||||
@@ -718,6 +718,38 @@ AVAILABLE_TOOLS: list[MCPTool] = [
|
|||||||
},
|
},
|
||||||
write_operation=True,
|
write_operation=True,
|
||||||
),
|
),
|
||||||
|
_tool(
|
||||||
|
"gitea_request",
|
||||||
|
(
|
||||||
|
"Generic escape hatch that calls an arbitrary Gitea REST endpoint "
|
||||||
|
"(method + path). Prefer the dedicated tools; use this only for "
|
||||||
|
"endpoints they do not cover. Subject to policy, write-mode and the "
|
||||||
|
"sensitive-path denylist. Methods other than GET/HEAD are writes and "
|
||||||
|
"require write-mode plus a whitelisted repository."
|
||||||
|
),
|
||||||
|
{
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"method": {
|
||||||
|
"type": "string",
|
||||||
|
"enum": ["GET", "HEAD", "POST", "PUT", "PATCH", "DELETE"],
|
||||||
|
},
|
||||||
|
"path": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Gitea REST path, e.g. /repos/{owner}/{repo}/pulls/1/merge",
|
||||||
|
},
|
||||||
|
"query": {"type": "object", "description": "Optional query-string parameters"},
|
||||||
|
"body": {"type": "object", "description": "Optional JSON request body"},
|
||||||
|
},
|
||||||
|
"required": ["method", "path"],
|
||||||
|
"additionalProperties": False,
|
||||||
|
},
|
||||||
|
# write_operation is intentionally False: a static flag cannot describe a
|
||||||
|
# tool that is read OR write depending on the method. Setting it True
|
||||||
|
# would force the central write-mode gate on GETs and break reads. The
|
||||||
|
# handler is authoritative via its own per-method authorize() call.
|
||||||
|
write_operation=False,
|
||||||
|
),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -60,6 +60,7 @@ from aegis_gitea_mcp.request_context import (
|
|||||||
)
|
)
|
||||||
from aegis_gitea_mcp.security import sanitize_data
|
from aegis_gitea_mcp.security import sanitize_data
|
||||||
from aegis_gitea_mcp.tools.arguments import extract_repository, extract_target_path
|
from aegis_gitea_mcp.tools.arguments import extract_repository, extract_target_path
|
||||||
|
from aegis_gitea_mcp.tools.raw_tools import raw_api_request_tool
|
||||||
from aegis_gitea_mcp.tools.read_tools import (
|
from aegis_gitea_mcp.tools.read_tools import (
|
||||||
compare_refs_tool,
|
compare_refs_tool,
|
||||||
get_branch_tool,
|
get_branch_tool,
|
||||||
@@ -420,6 +421,9 @@ TOOL_HANDLERS: dict[str, ToolHandler] = {
|
|||||||
"create_branch": create_branch_tool,
|
"create_branch": create_branch_tool,
|
||||||
"create_milestone": create_milestone_tool,
|
"create_milestone": create_milestone_tool,
|
||||||
"edit_issue_comment": edit_issue_comment_tool,
|
"edit_issue_comment": edit_issue_comment_tool,
|
||||||
|
# Generic raw API dispatch (escape hatch). Registered as a read tool so GETs
|
||||||
|
# work without write-mode; the handler authorizes writes per-method itself.
|
||||||
|
"gitea_request": raw_api_request_tool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -2,7 +2,9 @@
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from typing import Annotated, Literal
|
import re
|
||||||
|
from typing import Annotated, Any, Literal
|
||||||
|
from urllib.parse import urlsplit
|
||||||
|
|
||||||
from pydantic import (
|
from pydantic import (
|
||||||
AfterValidator,
|
AfterValidator,
|
||||||
@@ -10,6 +12,7 @@ from pydantic import (
|
|||||||
BeforeValidator,
|
BeforeValidator,
|
||||||
ConfigDict,
|
ConfigDict,
|
||||||
Field,
|
Field,
|
||||||
|
field_validator,
|
||||||
model_validator,
|
model_validator,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -446,6 +449,137 @@ class RepoTopicsArgs(RepositoryArgs):
|
|||||||
"""Arguments for list_repo_topics."""
|
"""Arguments for list_repo_topics."""
|
||||||
|
|
||||||
|
|
||||||
|
# --- Raw API dispatch (gitea_request escape hatch) -------------------------
|
||||||
|
|
||||||
|
# HTTP methods the generic dispatch tool accepts. Everything outside GET/HEAD is
|
||||||
|
# treated as a write so the policy/write-mode gate applies.
|
||||||
|
RAW_API_METHODS = ("GET", "HEAD", "POST", "PUT", "PATCH", "DELETE")
|
||||||
|
_RAW_WRITE_METHODS = frozenset({"POST", "PUT", "PATCH", "DELETE"})
|
||||||
|
|
||||||
|
# Path segments/subpaths blocked for *every* method unless explicitly overridden
|
||||||
|
# via RAW_API_ALLOW_SENSITIVE. A GET on these already leaks credentials or
|
||||||
|
# privileged configuration, so they are denied independently of policy.yaml.
|
||||||
|
_RAW_SENSITIVE_SEGMENTS = frozenset({"admin", "tokens", "secrets", "hooks", "keys", "gpg_keys"})
|
||||||
|
_RAW_SENSITIVE_SUBPATHS = ("applications/oauth2", "actions/runners/registration-token")
|
||||||
|
|
||||||
|
# Endpoints under /repos/ that are not scoped to a single repository.
|
||||||
|
_RAW_CROSS_REPO_OWNERS = frozenset({"search", "issues"})
|
||||||
|
|
||||||
|
# Resources whose trailing segments form a file path target for policy checks.
|
||||||
|
_RAW_FILE_RESOURCES = frozenset({"contents", "raw", "media"})
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_raw_endpoint(path: str) -> str:
|
||||||
|
"""Normalize a raw API path into an ``/api/v1``-prefixed endpoint.
|
||||||
|
|
||||||
|
Accepts a bare path (``/repos/o/r``), an already-prefixed path
|
||||||
|
(``/api/v1/repos/o/r``), or a full URL (the scheme/host and any query string
|
||||||
|
are stripped — the separate ``query`` argument carries query parameters).
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: When the path contains a ``..`` traversal segment.
|
||||||
|
"""
|
||||||
|
candidate = path.strip()
|
||||||
|
split = urlsplit(candidate)
|
||||||
|
# When a full URL is supplied, keep only its path component.
|
||||||
|
raw_path = split.path if (split.scheme or split.netloc) else candidate
|
||||||
|
# Drop any query/fragment a caller may have inlined into the path string.
|
||||||
|
raw_path = raw_path.split("?", 1)[0].split("#", 1)[0]
|
||||||
|
raw_path = raw_path.replace("\\", "/")
|
||||||
|
segments = [seg for seg in raw_path.split("/") if seg and seg != "."]
|
||||||
|
if any(seg == ".." for seg in segments):
|
||||||
|
raise ValueError("path must not contain '..' traversal segments")
|
||||||
|
rel_segments = segments[2:] if segments[:2] == ["api", "v1"] else segments
|
||||||
|
if not rel_segments:
|
||||||
|
return "/api/v1"
|
||||||
|
return "/api/v1/" + "/".join(rel_segments)
|
||||||
|
|
||||||
|
|
||||||
|
def _raw_relative_segments(endpoint: str) -> list[str]:
|
||||||
|
"""Return the endpoint segments after the ``/api/v1`` prefix."""
|
||||||
|
segments = [seg for seg in endpoint.split("/") if seg]
|
||||||
|
return segments[2:] if segments[:2] == ["api", "v1"] else segments
|
||||||
|
|
||||||
|
|
||||||
|
def raw_top_segment(endpoint: str) -> str:
|
||||||
|
"""Return the first path segment after ``/api/v1`` for coarse policy grouping."""
|
||||||
|
rel = _raw_relative_segments(endpoint)
|
||||||
|
return rel[0] if rel else ""
|
||||||
|
|
||||||
|
|
||||||
|
def raw_method_is_write(method: str) -> bool:
|
||||||
|
"""Return whether an HTTP method mutates state."""
|
||||||
|
return method.upper() in _RAW_WRITE_METHODS
|
||||||
|
|
||||||
|
|
||||||
|
def raw_is_sensitive(endpoint: str) -> bool:
|
||||||
|
"""Return whether an endpoint touches an admin/credential surface."""
|
||||||
|
rel = _raw_relative_segments(endpoint)
|
||||||
|
if any(seg in _RAW_SENSITIVE_SEGMENTS for seg in rel):
|
||||||
|
return True
|
||||||
|
joined = "/".join(rel)
|
||||||
|
return any(sub in joined for sub in _RAW_SENSITIVE_SUBPATHS)
|
||||||
|
|
||||||
|
|
||||||
|
def _raw_repo_segments(endpoint: str) -> list[str] | None:
|
||||||
|
"""Return ``[owner, repo, *rest]`` for a single-repository endpoint, else None."""
|
||||||
|
rel = _raw_relative_segments(endpoint)
|
||||||
|
if len(rel) < 3 or rel[0] != "repos":
|
||||||
|
return None
|
||||||
|
owner, repo = rel[1], rel[2]
|
||||||
|
if owner in _RAW_CROSS_REPO_OWNERS:
|
||||||
|
return None
|
||||||
|
if not (re.match(_REPO_PART_PATTERN, owner) and re.match(_REPO_PART_PATTERN, repo)):
|
||||||
|
return None
|
||||||
|
return [owner, repo, *rel[3:]]
|
||||||
|
|
||||||
|
|
||||||
|
def parse_raw_repository(endpoint: str) -> str | None:
|
||||||
|
"""Parse ``owner/repo`` from a repo-scoped endpoint; None for cross-repo paths."""
|
||||||
|
repo_segments = _raw_repo_segments(endpoint)
|
||||||
|
if repo_segments is None:
|
||||||
|
return None
|
||||||
|
return f"{repo_segments[0]}/{repo_segments[1]}"
|
||||||
|
|
||||||
|
|
||||||
|
def parse_raw_target_path(endpoint: str) -> str | None:
|
||||||
|
"""Parse a file-path target from ``contents``/``raw``/``media`` endpoints."""
|
||||||
|
repo_segments = _raw_repo_segments(endpoint)
|
||||||
|
if repo_segments is None or len(repo_segments) < 4:
|
||||||
|
return None
|
||||||
|
if repo_segments[2] not in _RAW_FILE_RESOURCES:
|
||||||
|
return None
|
||||||
|
file_path = "/".join(repo_segments[3:])
|
||||||
|
return file_path or None
|
||||||
|
|
||||||
|
|
||||||
|
class RawApiRequestArgs(StrictBaseModel):
|
||||||
|
"""Arguments for the generic ``gitea_request`` escape-hatch tool."""
|
||||||
|
|
||||||
|
method: Literal["GET", "HEAD", "POST", "PUT", "PATCH", "DELETE"] = Field(
|
||||||
|
..., description="HTTP method"
|
||||||
|
)
|
||||||
|
path: str = Field(..., min_length=1, max_length=2048, description="Gitea REST path")
|
||||||
|
query: dict[str, Any] | None = Field(
|
||||||
|
default=None, description="Optional query-string parameters"
|
||||||
|
)
|
||||||
|
body: dict[str, Any] | None = Field(default=None, description="Optional JSON request body")
|
||||||
|
|
||||||
|
@field_validator("method", mode="before")
|
||||||
|
@classmethod
|
||||||
|
def _normalize_method(cls, value: object) -> object:
|
||||||
|
"""Uppercase the method before enum validation so 'get' is accepted."""
|
||||||
|
if isinstance(value, str):
|
||||||
|
return value.strip().upper()
|
||||||
|
return value
|
||||||
|
|
||||||
|
@model_validator(mode="after")
|
||||||
|
def _validate_path(self) -> RawApiRequestArgs:
|
||||||
|
"""Reject path traversal up front so the handler sees a clean endpoint."""
|
||||||
|
normalize_raw_endpoint(self.path)
|
||||||
|
return self
|
||||||
|
|
||||||
|
|
||||||
def extract_repository(arguments: dict[str, object]) -> str | None:
|
def extract_repository(arguments: dict[str, object]) -> str | None:
|
||||||
"""Extract `owner/repo` from raw argument mapping.
|
"""Extract `owner/repo` from raw argument mapping.
|
||||||
|
|
||||||
@@ -459,6 +593,16 @@ def extract_repository(arguments: dict[str, object]) -> str | None:
|
|||||||
repo = arguments.get("repo")
|
repo = arguments.get("repo")
|
||||||
if isinstance(owner, str) and isinstance(repo, str) and owner and repo:
|
if isinstance(owner, str) and isinstance(repo, str) and owner and repo:
|
||||||
return f"{owner}/{repo}"
|
return f"{owner}/{repo}"
|
||||||
|
# Raw API dispatch: derive the repository from the request path so the central
|
||||||
|
# policy gate and the service-PAT per-user permission check evaluate the real
|
||||||
|
# target instead of treating every raw call as repo-less.
|
||||||
|
path = arguments.get("path")
|
||||||
|
method = arguments.get("method")
|
||||||
|
if isinstance(path, str) and isinstance(method, str):
|
||||||
|
try:
|
||||||
|
return parse_raw_repository(normalize_raw_endpoint(path))
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
@@ -467,4 +611,13 @@ def extract_target_path(arguments: dict[str, object]) -> str | None:
|
|||||||
filepath = arguments.get("filepath")
|
filepath = arguments.get("filepath")
|
||||||
if isinstance(filepath, str) and filepath:
|
if isinstance(filepath, str) and filepath:
|
||||||
return filepath
|
return filepath
|
||||||
|
# Raw API dispatch: expose the file path embedded in contents/raw/media
|
||||||
|
# endpoints so repository path allow/deny rules still apply to raw calls.
|
||||||
|
path = arguments.get("path")
|
||||||
|
method = arguments.get("method")
|
||||||
|
if isinstance(path, str) and isinstance(method, str):
|
||||||
|
try:
|
||||||
|
return parse_raw_target_path(normalize_raw_endpoint(path))
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
return None
|
return None
|
||||||
|
|||||||
@@ -0,0 +1,129 @@
|
|||||||
|
"""Generic raw Gitea REST dispatch tool (escape hatch).
|
||||||
|
|
||||||
|
``gitea_request`` exposes the long tail of the Gitea API that the curated, typed
|
||||||
|
tools do not cover. A single tool surface would normally collapse the
|
||||||
|
granularity of ``policy.yaml``, so this handler re-derives a coarse virtual tool
|
||||||
|
name (``gitea_request:<METHOD>:<top-segment>``) and the target repository/path
|
||||||
|
from each request and runs them back through the policy engine. That reuses the
|
||||||
|
existing write-mode + write-whitelist enforcement and keeps per-method/per-repo
|
||||||
|
policy control intact behind the single tool.
|
||||||
|
|
||||||
|
Two layers of authorization apply:
|
||||||
|
|
||||||
|
* The central dispatch gate in ``server.py`` allows/denies the registered
|
||||||
|
``gitea_request`` name and, in service-PAT mode, verifies the signed-in user's
|
||||||
|
permission on the parsed repository.
|
||||||
|
* This handler then authorizes the fine-grained virtual tool name and enforces a
|
||||||
|
built-in admin/credential denylist that ``policy.yaml`` cannot re-open.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from fastapi import HTTPException
|
||||||
|
|
||||||
|
from aegis_gitea_mcp.audit import get_audit_logger
|
||||||
|
from aegis_gitea_mcp.config import get_settings
|
||||||
|
from aegis_gitea_mcp.gitea_client import (
|
||||||
|
GiteaAuthenticationError,
|
||||||
|
GiteaAuthorizationError,
|
||||||
|
GiteaClient,
|
||||||
|
GiteaError,
|
||||||
|
)
|
||||||
|
from aegis_gitea_mcp.policy import get_policy_engine
|
||||||
|
from aegis_gitea_mcp.response_limits import limit_items, limit_text
|
||||||
|
from aegis_gitea_mcp.tools.arguments import (
|
||||||
|
RawApiRequestArgs,
|
||||||
|
normalize_raw_endpoint,
|
||||||
|
parse_raw_repository,
|
||||||
|
parse_raw_target_path,
|
||||||
|
raw_is_sensitive,
|
||||||
|
raw_method_is_write,
|
||||||
|
raw_top_segment,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _bound_response(data: Any) -> dict[str, Any]:
|
||||||
|
"""Bound a raw response into stable, size-limited envelope fields."""
|
||||||
|
if isinstance(data, list):
|
||||||
|
bounded, omitted = limit_items(list(data))
|
||||||
|
return {"data": bounded, "count": len(bounded), "omitted": omitted}
|
||||||
|
if isinstance(data, dict):
|
||||||
|
serialized = json.dumps(data, ensure_ascii=False, default=str)
|
||||||
|
capped = limit_text(serialized)
|
||||||
|
if len(capped) < len(serialized):
|
||||||
|
# Oversized dict: return a truncated JSON string instead of the object.
|
||||||
|
return {"data": capped, "truncated": True}
|
||||||
|
return {"data": data, "truncated": False}
|
||||||
|
if isinstance(data, str):
|
||||||
|
return {"data": limit_text(data)}
|
||||||
|
return {"data": data}
|
||||||
|
|
||||||
|
|
||||||
|
async def raw_api_request_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
|
||||||
|
"""Dispatch an arbitrary Gitea REST endpoint subject to policy and denylists."""
|
||||||
|
settings = get_settings()
|
||||||
|
audit = get_audit_logger()
|
||||||
|
|
||||||
|
if not settings.raw_api_enabled:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=403,
|
||||||
|
detail="Raw API dispatch is disabled (set RAW_API_ENABLED=true to enable).",
|
||||||
|
)
|
||||||
|
|
||||||
|
parsed = RawApiRequestArgs.model_validate(arguments)
|
||||||
|
method = parsed.method
|
||||||
|
endpoint = normalize_raw_endpoint(parsed.path)
|
||||||
|
is_write = raw_method_is_write(method)
|
||||||
|
|
||||||
|
# Admin/credential denylist applies to every method and cannot be re-opened
|
||||||
|
# from policy.yaml — only RAW_API_ALLOW_SENSITIVE overrides it.
|
||||||
|
if raw_is_sensitive(endpoint) and not settings.raw_api_allow_sensitive:
|
||||||
|
audit.log_access_denied(tool_name="gitea_request", reason="raw_sensitive_path_denied")
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=403,
|
||||||
|
detail=(
|
||||||
|
"Endpoint targets an admin/credential surface blocked by the raw-API "
|
||||||
|
"sensitive-path denylist."
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
repository = parse_raw_repository(endpoint)
|
||||||
|
target_path = parse_raw_target_path(endpoint)
|
||||||
|
|
||||||
|
# Coarse, stable virtual tool name so policy.yaml can allow/deny by method +
|
||||||
|
# top-level path segment (policy matches tool names by exact set membership).
|
||||||
|
policy_tool_name = f"gitea_request:{method}:{raw_top_segment(endpoint)}"
|
||||||
|
decision = get_policy_engine().authorize(
|
||||||
|
tool_name=policy_tool_name,
|
||||||
|
is_write=is_write,
|
||||||
|
repository=repository,
|
||||||
|
target_path=target_path,
|
||||||
|
)
|
||||||
|
if not decision.allowed:
|
||||||
|
audit.log_access_denied(
|
||||||
|
tool_name=policy_tool_name,
|
||||||
|
repository=repository,
|
||||||
|
reason=decision.reason,
|
||||||
|
)
|
||||||
|
raise HTTPException(status_code=403, detail=f"Policy denied raw request: {decision.reason}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
data = await gitea.raw_request(method, endpoint, params=parsed.query, json_body=parsed.body)
|
||||||
|
except (GiteaAuthenticationError, GiteaAuthorizationError):
|
||||||
|
# Let auth/authz failures surface so the server returns actionable
|
||||||
|
# re-authorization guidance instead of a generic internal error.
|
||||||
|
raise
|
||||||
|
except GiteaError as exc:
|
||||||
|
raise RuntimeError(f"Raw API request failed: {exc}") from exc
|
||||||
|
|
||||||
|
envelope: dict[str, Any] = {
|
||||||
|
"method": method,
|
||||||
|
"path": endpoint,
|
||||||
|
"write": is_write,
|
||||||
|
"repository": repository,
|
||||||
|
}
|
||||||
|
envelope.update(_bound_response(data))
|
||||||
|
return envelope
|
||||||
@@ -0,0 +1,321 @@
|
|||||||
|
"""Tests for the generic gitea_request raw API dispatch tool."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from fastapi import HTTPException
|
||||||
|
from pydantic import ValidationError
|
||||||
|
|
||||||
|
from aegis_gitea_mcp.config import reset_settings
|
||||||
|
from aegis_gitea_mcp.tools.arguments import (
|
||||||
|
extract_repository,
|
||||||
|
extract_target_path,
|
||||||
|
normalize_raw_endpoint,
|
||||||
|
parse_raw_repository,
|
||||||
|
parse_raw_target_path,
|
||||||
|
raw_is_sensitive,
|
||||||
|
raw_top_segment,
|
||||||
|
)
|
||||||
|
from aegis_gitea_mcp.tools.raw_tools import raw_api_request_tool
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def raw_env(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
|
||||||
|
"""Minimal API-key-mode settings with policy that allows reads, denies writes."""
|
||||||
|
reset_settings()
|
||||||
|
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
|
||||||
|
monkeypatch.setenv("GITEA_TOKEN", "test-token")
|
||||||
|
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
|
||||||
|
monkeypatch.setenv("ENVIRONMENT", "test")
|
||||||
|
# Point at a non-existent policy file so the default config applies
|
||||||
|
# (read: allow, write: deny) and tests do not depend on the repo policy.yaml.
|
||||||
|
monkeypatch.setenv("POLICY_FILE_PATH", str(tmp_path / "missing-policy.yaml"))
|
||||||
|
|
||||||
|
|
||||||
|
class StubRawGitea:
|
||||||
|
"""Stub Gitea client capturing raw_request calls."""
|
||||||
|
|
||||||
|
def __init__(self, response: Any = None) -> None:
|
||||||
|
self._response: Any = {"ok": True} if response is None else response
|
||||||
|
self.calls: list[dict[str, Any]] = []
|
||||||
|
|
||||||
|
async def raw_request(
|
||||||
|
self,
|
||||||
|
method: str,
|
||||||
|
endpoint: str,
|
||||||
|
*,
|
||||||
|
params: dict[str, Any] | None = None,
|
||||||
|
json_body: dict[str, Any] | None = None,
|
||||||
|
) -> Any:
|
||||||
|
self.calls.append(
|
||||||
|
{"method": method, "endpoint": endpoint, "params": params, "json_body": json_body}
|
||||||
|
)
|
||||||
|
return self._response
|
||||||
|
|
||||||
|
|
||||||
|
# --- Handler behavior ------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
async def test_get_repo_endpoint_allowed_and_parses_repository(raw_env: None) -> None:
|
||||||
|
"""A GET on a repo endpoint is allowed and parses owner/repo from the path."""
|
||||||
|
stub = StubRawGitea({"number": 1})
|
||||||
|
result = await raw_api_request_tool(stub, {"method": "GET", "path": "/repos/acme/app/pulls/1"})
|
||||||
|
|
||||||
|
assert result["method"] == "GET"
|
||||||
|
assert result["path"] == "/api/v1/repos/acme/app/pulls/1"
|
||||||
|
assert result["write"] is False
|
||||||
|
assert result["repository"] == "acme/app"
|
||||||
|
assert result["data"] == {"number": 1}
|
||||||
|
assert stub.calls[0]["endpoint"] == "/api/v1/repos/acme/app/pulls/1"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_lowercase_method_is_normalized(raw_env: None) -> None:
|
||||||
|
"""A lowercase method is uppercased and accepted."""
|
||||||
|
stub = StubRawGitea([{"id": 1}])
|
||||||
|
result = await raw_api_request_tool(stub, {"method": "get", "path": "/repos/acme/app/issues"})
|
||||||
|
assert result["method"] == "GET"
|
||||||
|
assert result["count"] == 1
|
||||||
|
|
||||||
|
|
||||||
|
async def test_delete_denied_when_write_mode_off(raw_env: None) -> None:
|
||||||
|
"""A write method is denied (no network call) while write-mode is disabled."""
|
||||||
|
stub = StubRawGitea()
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
await raw_api_request_tool(stub, {"method": "DELETE", "path": "/repos/acme/app/issues/1"})
|
||||||
|
|
||||||
|
assert exc_info.value.status_code == 403
|
||||||
|
assert "write mode is disabled" in str(exc_info.value.detail)
|
||||||
|
assert stub.calls == []
|
||||||
|
|
||||||
|
|
||||||
|
async def test_write_allowed_with_write_mode_and_whitelist(
|
||||||
|
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
|
||||||
|
) -> None:
|
||||||
|
"""A write succeeds only when write-mode is on, the repo is whitelisted, and policy allows."""
|
||||||
|
policy_file = tmp_path / "policy.yaml"
|
||||||
|
policy_file.write_text("defaults:\n read: allow\n write: allow\n", encoding="utf-8")
|
||||||
|
|
||||||
|
reset_settings()
|
||||||
|
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
|
||||||
|
monkeypatch.setenv("GITEA_TOKEN", "test-token")
|
||||||
|
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
|
||||||
|
monkeypatch.setenv("ENVIRONMENT", "test")
|
||||||
|
monkeypatch.setenv("POLICY_FILE_PATH", str(policy_file))
|
||||||
|
monkeypatch.setenv("WRITE_MODE", "true")
|
||||||
|
monkeypatch.setenv("WRITE_REPOSITORY_WHITELIST", "acme/app")
|
||||||
|
|
||||||
|
stub = StubRawGitea({"merged": True})
|
||||||
|
result = await raw_api_request_tool(
|
||||||
|
stub,
|
||||||
|
{"method": "PUT", "path": "/repos/acme/app/pulls/1/merge", "body": {"Do": "merge"}},
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result["write"] is True
|
||||||
|
assert result["repository"] == "acme/app"
|
||||||
|
assert stub.calls[0]["json_body"] == {"Do": "merge"}
|
||||||
|
|
||||||
|
|
||||||
|
async def test_write_denied_for_repo_outside_whitelist(
|
||||||
|
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
|
||||||
|
) -> None:
|
||||||
|
"""A write on a repo not in the whitelist is denied even with write-mode on."""
|
||||||
|
policy_file = tmp_path / "policy.yaml"
|
||||||
|
policy_file.write_text("defaults:\n read: allow\n write: allow\n", encoding="utf-8")
|
||||||
|
|
||||||
|
reset_settings()
|
||||||
|
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
|
||||||
|
monkeypatch.setenv("GITEA_TOKEN", "test-token")
|
||||||
|
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
|
||||||
|
monkeypatch.setenv("ENVIRONMENT", "test")
|
||||||
|
monkeypatch.setenv("POLICY_FILE_PATH", str(policy_file))
|
||||||
|
monkeypatch.setenv("WRITE_MODE", "true")
|
||||||
|
monkeypatch.setenv("WRITE_REPOSITORY_WHITELIST", "acme/other")
|
||||||
|
|
||||||
|
stub = StubRawGitea()
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
await raw_api_request_tool(stub, {"method": "POST", "path": "/repos/acme/app/issues"})
|
||||||
|
|
||||||
|
assert exc_info.value.status_code == 403
|
||||||
|
assert "whitelist" in str(exc_info.value.detail)
|
||||||
|
assert stub.calls == []
|
||||||
|
|
||||||
|
|
||||||
|
async def test_non_repository_write_denied(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
|
||||||
|
"""A write that targets no repository is denied (secure default)."""
|
||||||
|
policy_file = tmp_path / "policy.yaml"
|
||||||
|
policy_file.write_text("defaults:\n read: allow\n write: allow\n", encoding="utf-8")
|
||||||
|
|
||||||
|
reset_settings()
|
||||||
|
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
|
||||||
|
monkeypatch.setenv("GITEA_TOKEN", "test-token")
|
||||||
|
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
|
||||||
|
monkeypatch.setenv("ENVIRONMENT", "test")
|
||||||
|
monkeypatch.setenv("POLICY_FILE_PATH", str(policy_file))
|
||||||
|
monkeypatch.setenv("WRITE_MODE", "true")
|
||||||
|
monkeypatch.setenv("WRITE_REPOSITORY_WHITELIST", "acme/app")
|
||||||
|
|
||||||
|
stub = StubRawGitea()
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
await raw_api_request_tool(stub, {"method": "POST", "path": "/user/repos"})
|
||||||
|
|
||||||
|
assert exc_info.value.status_code == 403
|
||||||
|
assert "repository target" in str(exc_info.value.detail)
|
||||||
|
assert stub.calls == []
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"path",
|
||||||
|
["/admin/users", "/users/bob/tokens", "/repos/acme/app/hooks", "/user/keys"],
|
||||||
|
)
|
||||||
|
async def test_sensitive_paths_denied_on_get(raw_env: None, path: str) -> None:
|
||||||
|
"""Admin/credential surfaces are denied for every method, including GET."""
|
||||||
|
stub = StubRawGitea()
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
await raw_api_request_tool(stub, {"method": "GET", "path": path})
|
||||||
|
|
||||||
|
assert exc_info.value.status_code == 403
|
||||||
|
assert "sensitive-path denylist" in str(exc_info.value.detail)
|
||||||
|
assert stub.calls == []
|
||||||
|
|
||||||
|
|
||||||
|
async def test_sensitive_path_allowed_with_override(
|
||||||
|
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
|
||||||
|
) -> None:
|
||||||
|
"""RAW_API_ALLOW_SENSITIVE bypasses the admin/credential denylist."""
|
||||||
|
reset_settings()
|
||||||
|
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
|
||||||
|
monkeypatch.setenv("GITEA_TOKEN", "test-token")
|
||||||
|
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
|
||||||
|
monkeypatch.setenv("ENVIRONMENT", "test")
|
||||||
|
monkeypatch.setenv("POLICY_FILE_PATH", str(tmp_path / "missing.yaml"))
|
||||||
|
monkeypatch.setenv("RAW_API_ALLOW_SENSITIVE", "true")
|
||||||
|
|
||||||
|
stub = StubRawGitea([{"id": 1}])
|
||||||
|
result = await raw_api_request_tool(stub, {"method": "GET", "path": "/admin/users"})
|
||||||
|
assert result["data"] == [{"id": 1}]
|
||||||
|
assert stub.calls[0]["endpoint"] == "/api/v1/admin/users"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_cross_repo_search_not_treated_as_repository(raw_env: None) -> None:
|
||||||
|
"""/repos/issues/search is a cross-repo endpoint, so repository is None."""
|
||||||
|
stub = StubRawGitea([{"id": 1}])
|
||||||
|
result = await raw_api_request_tool(
|
||||||
|
stub, {"method": "GET", "path": "/repos/issues/search", "query": {"q": "bug"}}
|
||||||
|
)
|
||||||
|
assert result["repository"] is None
|
||||||
|
assert result["count"] == 1
|
||||||
|
assert stub.calls[0]["params"] == {"q": "bug"}
|
||||||
|
|
||||||
|
|
||||||
|
async def test_unknown_method_rejected_before_network(raw_env: None) -> None:
|
||||||
|
"""An unknown HTTP method is rejected during validation before any network call."""
|
||||||
|
stub = StubRawGitea()
|
||||||
|
with pytest.raises(ValidationError):
|
||||||
|
await raw_api_request_tool(stub, {"method": "OPTIONS", "path": "/repos/acme/app"})
|
||||||
|
assert stub.calls == []
|
||||||
|
|
||||||
|
|
||||||
|
async def test_path_traversal_rejected(raw_env: None) -> None:
|
||||||
|
"""A path containing '..' is rejected during validation."""
|
||||||
|
stub = StubRawGitea()
|
||||||
|
with pytest.raises(ValidationError):
|
||||||
|
await raw_api_request_tool(
|
||||||
|
stub, {"method": "GET", "path": "/repos/acme/app/../../admin/users"}
|
||||||
|
)
|
||||||
|
assert stub.calls == []
|
||||||
|
|
||||||
|
|
||||||
|
async def test_full_url_is_reduced_to_path(raw_env: None) -> None:
|
||||||
|
"""A full URL is reduced to just the API path."""
|
||||||
|
stub = StubRawGitea({"name": "app"})
|
||||||
|
result = await raw_api_request_tool(
|
||||||
|
stub,
|
||||||
|
{
|
||||||
|
"method": "GET",
|
||||||
|
"path": "https://gitea.example.com/api/v1/repos/acme/app/contents/src/app.py?ref=main",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert result["path"] == "/api/v1/repos/acme/app/contents/src/app.py"
|
||||||
|
assert result["repository"] == "acme/app"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_raw_api_disabled(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
|
||||||
|
"""The killswitch disables every dispatch."""
|
||||||
|
reset_settings()
|
||||||
|
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
|
||||||
|
monkeypatch.setenv("GITEA_TOKEN", "test-token")
|
||||||
|
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
|
||||||
|
monkeypatch.setenv("ENVIRONMENT", "test")
|
||||||
|
monkeypatch.setenv("POLICY_FILE_PATH", str(tmp_path / "missing.yaml"))
|
||||||
|
monkeypatch.setenv("RAW_API_ENABLED", "false")
|
||||||
|
|
||||||
|
stub = StubRawGitea()
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
await raw_api_request_tool(stub, {"method": "GET", "path": "/repos/acme/app"})
|
||||||
|
assert exc_info.value.status_code == 403
|
||||||
|
assert "disabled" in str(exc_info.value.detail)
|
||||||
|
assert stub.calls == []
|
||||||
|
|
||||||
|
|
||||||
|
async def test_large_dict_response_is_truncated(raw_env: None) -> None:
|
||||||
|
"""An oversized object response is returned as a truncated JSON string."""
|
||||||
|
big = {"blob": "x" * 50_000}
|
||||||
|
stub = StubRawGitea(big)
|
||||||
|
result = await raw_api_request_tool(stub, {"method": "GET", "path": "/repos/acme/app"})
|
||||||
|
assert result["truncated"] is True
|
||||||
|
assert isinstance(result["data"], str)
|
||||||
|
|
||||||
|
|
||||||
|
# --- Path parsing helpers --------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
("path", "expected"),
|
||||||
|
[
|
||||||
|
("/repos/acme/app", "/api/v1/repos/acme/app"),
|
||||||
|
("repos/acme/app", "/api/v1/repos/acme/app"),
|
||||||
|
("/api/v1/repos/acme/app", "/api/v1/repos/acme/app"),
|
||||||
|
("/", "/api/v1"),
|
||||||
|
("", "/api/v1"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_normalize_raw_endpoint(path: str, expected: str) -> None:
|
||||||
|
assert normalize_raw_endpoint(path) == expected
|
||||||
|
|
||||||
|
|
||||||
|
def test_normalize_raw_endpoint_rejects_traversal() -> None:
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
normalize_raw_endpoint("/repos/acme/../admin")
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_raw_repository_variants() -> None:
|
||||||
|
assert parse_raw_repository("/api/v1/repos/acme/app/pulls/1") == "acme/app"
|
||||||
|
assert parse_raw_repository("/api/v1/repos/search") is None
|
||||||
|
assert parse_raw_repository("/api/v1/repos/issues/search") is None
|
||||||
|
assert parse_raw_repository("/api/v1/user/repos") is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_raw_target_path() -> None:
|
||||||
|
assert parse_raw_target_path("/api/v1/repos/acme/app/contents/src/app.py") == "src/app.py"
|
||||||
|
assert parse_raw_target_path("/api/v1/repos/acme/app/raw/README.md") == "README.md"
|
||||||
|
assert parse_raw_target_path("/api/v1/repos/acme/app/pulls/1") is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_raw_top_segment_and_sensitivity() -> None:
|
||||||
|
assert raw_top_segment("/api/v1/repos/acme/app") == "repos"
|
||||||
|
assert raw_top_segment("/api/v1") == ""
|
||||||
|
assert raw_is_sensitive("/api/v1/repos/acme/app/hooks") is True
|
||||||
|
assert raw_is_sensitive("/api/v1/user/applications/oauth2") is True
|
||||||
|
assert raw_is_sensitive("/api/v1/repos/acme/app/pulls") is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_extractors_are_raw_aware() -> None:
|
||||||
|
raw_args = {"method": "GET", "path": "/repos/acme/app/contents/src/app.py"}
|
||||||
|
assert extract_repository(raw_args) == "acme/app"
|
||||||
|
assert extract_target_path(raw_args) == "src/app.py"
|
||||||
|
# Malformed raw path must not raise from the extractors.
|
||||||
|
assert extract_repository({"method": "GET", "path": "/repos/acme/../x"}) is None
|
||||||
|
assert extract_target_path({"method": "GET", "path": "/repos/acme/../x"}) is None
|
||||||
Reference in New Issue
Block a user