Compare commits

...

4 Commits

Author SHA1 Message Date
Latte 7f7aaab5a6 test(raw-api): cover gitea_request handler and path parsing
docker / lint (push) Successful in 38s
docker / test (push) Successful in 33s
docker / test (pull_request) Successful in 32s
test / test (push) Successful in 40s
lint / lint (push) Successful in 42s
docker / lint (pull_request) Successful in 39s
test / test (pull_request) Successful in 39s
lint / lint (pull_request) Successful in 40s
docker / docker (pull_request) Successful in 31s
docker / docker (push) Successful in 44s
Covers read allow + repository parsing, write denied without write-mode, write
allowed only for whitelisted repos, non-repo write denial, sensitive-path
denial (incl. GET) and override, cross-repo search handling, unknown-method and
traversal rejection before any network call, killswitch, response truncation,
and the raw path-parsing helpers and raw-aware extractors.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-26 12:26:57 +02:00
Latte 8c84d76bd5 docs(raw-api): document gitea_request, env vars and policy examples
Adds docs/raw-api.md (two-layer policy, sensitive denylist, env vars, write-mode
warning), links it from index and api-reference, documents RAW_API_ENABLED /
RAW_API_ALLOW_SENSITIVE in .env.example, and adds commented virtual-tool-name
deny examples to policy.yaml.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-26 12:26:57 +02:00
Latte 8e41fd12af feat(raw-api): register gitea_request tool and wire server dispatch
Registers gitea_request in AVAILABLE_TOOLS with write_operation=False
(deliberate: a static flag cannot describe a read-or-write tool; the handler
authorizes writes per-method) and maps the tool name to raw_api_request_tool in
the server handler registry.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-26 12:26:56 +02:00
Latte 2844c42ec8 feat(raw-api): add gitea_request schema, path parsing, client dispatch and handler
Adds the RawApiRequestArgs schema (extra=forbid), raw path normalization/
parsing helpers, a GiteaClient.raw_request that audits method+path only (never
the body), and the raw_api_request_tool handler. The handler derives a coarse
virtual tool name (gitea_request:METHOD:topsegment) plus repository/target_path
from the path and runs them back through the policy engine, enforces an
admin/credential sensitive-path denylist, and bounds responses. Two config
flags gate it: RAW_API_ENABLED (killswitch) and RAW_API_ALLOW_SENSITIVE.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-26 12:26:26 +02:00
12 changed files with 854 additions and 3 deletions
+11
View File
@@ -63,6 +63,17 @@ WRITE_MODE=false
WRITE_REPOSITORY_WHITELIST=
WRITE_ALLOW_ALL_TOKEN_REPOS=false
# Raw API dispatch (gitea_request escape hatch). See docs/raw-api.md.
# gitea_request can call any Gitea REST endpoint (method + path). It is still
# subject to policy.yaml, WRITE_MODE + the write whitelist, and a built-in
# admin/credential denylist. Set RAW_API_ENABLED=false to remove the tool's
# ability to dispatch entirely.
RAW_API_ENABLED=true
# Allow gitea_request to reach admin/credential surfaces (/admin, *tokens*,
# *secrets*, *hooks*, *keys*, applications/oauth2, runner registration tokens).
# Leave false unless you fully understand the exposure.
RAW_API_ALLOW_SENSITIVE=false
# Automation mode (disabled by default)
AUTOMATION_ENABLED=false
AUTOMATION_SCHEDULER_ENABLED=false
+12 -2
View File
@@ -90,8 +90,18 @@ Scope requirements:
- `create_milestone` (`owner`, `repo`, `title`, optional `description`, `due_on`)
- `edit_issue_comment` (`owner`, `repo`, `comment_id`, `body`)
Not supported by design: merge, branch/label/release deletion, force push, repo/admin
management.
Not supported by the dedicated tools by design: merge, branch/label/release deletion,
force push, repo/admin management. Endpoints not covered above are reachable through the
generic `gitea_request` escape hatch (subject to policy, write-mode, and a sensitive-path
denylist) — see [Raw API Dispatch](raw-api.md).
## Raw API Dispatch
- `gitea_request` (`method`, `path`, optional `query`, `body`)
- Calls an arbitrary Gitea REST endpoint. `GET`/`HEAD` are reads; other methods are
writes and require write-mode plus a whitelisted repository. Admin/credential
endpoints are blocked unless `RAW_API_ALLOW_SENSITIVE=true`. See
[Raw API Dispatch](raw-api.md) for the two-layer policy model and full details.
Note: `create_issue`, `add_labels`, and `remove_labels` accept label **names**; the
server resolves them to Gitea label ids and returns a clear error for unknown labels.
+1
View File
@@ -17,6 +17,7 @@ AegisGitea MCP acts as a secure bridge between AI assistants (such as Claude, Cl
| [Getting Started](getting-started.md) | Installation and first-time setup |
| [Configuration](configuration.md) | All environment variables and settings |
| [API Reference](api-reference.md) | HTTP endpoints and MCP tools |
| [Raw API Dispatch](raw-api.md) | The generic `gitea_request` escape-hatch tool |
| [Architecture](architecture.md) | System design and data flow |
| [Security](security.md) | Authentication, rate limiting, and audit logging |
| [Deployment](deployment.md) | Docker and production deployment |
+119
View File
@@ -0,0 +1,119 @@
# Raw API Dispatch (`gitea_request`)
`gitea_request` is a generic escape hatch that can call **any** Gitea REST
endpoint by method and path. It exists for the long tail of the Gitea API that
the curated, typed tools do not cover (merging PRs, reviews, writing files,
webhooks, branch/tag protections, collaborators, Actions/CI, packages,
notifications, and so on).
> Prefer the dedicated tools whenever one exists. Use `gitea_request` only for
> endpoints they do not cover. It is subject to policy, write-mode, and the
> sensitive-path denylist described below.
## Arguments
| Field | Type | Notes |
|-------|------|-------|
| `method` | enum | `GET`, `HEAD`, `POST`, `PUT`, `PATCH`, `DELETE` (case-insensitive). Any other method is rejected before any network call. |
| `path` | string | Gitea REST path. The `/api/v1` prefix is optional. A full URL may be supplied — the host and query string are stripped. |
| `query` | object | Optional query-string parameters. |
| `body` | object | Optional JSON request body. **Never logged.** |
The response is returned in a stable envelope:
```json
{
"method": "GET",
"path": "/api/v1/repos/acme/app/pulls/1",
"write": false,
"repository": "acme/app",
"data": { "...": "..." }
}
```
List responses add `count` and `omitted`; oversized objects are returned as a
truncated JSON string with `"truncated": true`. All responses are bounded by
`MAX_TOOL_RESPONSE_ITEMS` / `MAX_TOOL_RESPONSE_CHARS`.
## Two-layer authorization
A single tool surface would normally collapse the granularity of `policy.yaml`.
To preserve it, every call is authorized twice:
1. **Central gate (`server.py`).** The registered `gitea_request` tool name is
allowed/denied like any other tool. In service-PAT mode the central gate also
parses the target repository from the path and verifies that the signed-in
user has permission on that repository before the service PAT is used.
2. **Handler gate (`raw_tools.py`).** The handler derives a coarse **virtual
tool name** of the form `gitea_request:<METHOD>:<top-path-segment>` (for
example `gitea_request:GET:repos` or `gitea_request:DELETE:repos`) and runs
it back through the policy engine with the parsed repository, target path, and
a `is_write` flag (`true` for any method other than GET/HEAD). This reuses the
existing write-mode + write-whitelist enforcement and lets `policy.yaml` allow
or deny raw dispatch per method and per top-level path segment.
Because the policy engine matches tool names by **exact set membership** (only
`paths` use globbing), the virtual name is deliberately coarse and stable.
### Example: lock raw dispatch to reads
```yaml
tools:
deny:
- gitea_request:POST:repos
- gitea_request:PUT:repos
- gitea_request:PATCH:repos
- gitea_request:DELETE:repos
```
## Sensitive-path denylist
Independently of `policy.yaml`, the handler blocks endpoints that touch an
admin or credential surface **for every method, including GET** (a GET on these
already leaks credentials or privileged configuration):
- `/admin`
- `*tokens*`
- `*secrets*`
- `*hooks*`
- `*keys*` (and `*gpg_keys*`)
- `applications/oauth2`
- `actions/runners/registration-token`
This denylist lives in the handler and **cannot be re-opened from
`policy.yaml`.** It is overridden only by setting `RAW_API_ALLOW_SENSITIVE=true`.
## Configuration
| Variable | Default | Notes |
|----------|---------|-------|
| `RAW_API_ENABLED` | `true` | Killswitch. When `false`, `gitea_request` refuses every dispatch with a `403`. |
| `RAW_API_ALLOW_SENSITIVE` | `false` | When `true`, the admin/credential denylist is bypassed. Leave `false` unless you fully understand the exposure. |
## Security warning
> With `WRITE_MODE=true`, the **write whitelist is the only brake** on
> `POST`/`PUT`/`PATCH`/`DELETE` across the *entire* Gitea API surface reachable
> by `gitea_request`. Any write method against a whitelisted repository will be
> attempted. Keep the whitelist tight, prefer denying the write virtual tool
> names in `policy.yaml`, and keep `RAW_API_ALLOW_SENSITIVE=false`.
## Behavioral notes and edge cases
- **Full URL supplied instead of a path:** only the path is used; the host and
query string are discarded (`query` carries query parameters).
- **Path traversal (`..`):** rejected during argument validation (`400`).
- **Unknown / non-HTTP method:** rejected during argument validation, before any
network call.
- **Cross-repo endpoints** such as `/repos/search` and `/repos/issues/search`
are intentionally *not* treated as repository-scoped, so `repository` is
`null` for them.
- **Non-repository writes** such as `POST /user/repos` or `POST /orgs` are denied
with *"write operation requires a repository target"*. This is the secure
default — the per-user permission model is repository-scoped, so there is no
repository against which to verify the write. This behavior is intentional and
is not worked around.
- **Service-PAT mode:** non-repository endpoints (for example `GET /user/orgs`)
are denied by the central gate because per-user permission can only be verified
against a repository target. Use the dedicated tools for those, or run in
OAuth-only mode.
+15
View File
@@ -4,5 +4,20 @@ defaults:
tools:
deny: []
# The generic `gitea_request` tool authorizes each call under a coarse virtual
# tool name of the form `gitea_request:<METHOD>:<top-path-segment>`, e.g.
# `gitea_request:GET:repos` or `gitea_request:DELETE:repos`. To keep raw
# dispatch read-only while still allowing GETs, deny the write methods here:
#
# deny:
# - gitea_request:POST:repos
# - gitea_request:PUT:repos
# - gitea_request:PATCH:repos
# - gitea_request:DELETE:repos
#
# NOTE: The admin/credential denylist (/admin, *tokens*, *secrets*, *hooks*,
# *keys*, applications/oauth2, runner registration tokens) is enforced in the
# handler independently of this file and is NOT configured here. It can only be
# overridden by setting RAW_API_ALLOW_SENSITIVE=true.
repositories: {}
+13
View File
@@ -211,6 +211,19 @@ class Settings(BaseSettings):
"Disabled by default."
),
)
# Raw API dispatch (gitea_request escape hatch)
raw_api_enabled: bool = Field(
default=True,
description="Enable the generic gitea_request raw API dispatch tool",
)
raw_api_allow_sensitive: bool = Field(
default=False,
description=(
"Allow gitea_request to reach admin/credential endpoints "
"(/admin, *tokens*, *secrets*, *hooks*, *keys*, applications/oauth2, "
"runner registration tokens). Disabled by default."
),
)
automation_enabled: bool = Field(
default=False,
description="Enable automation endpoints and workflows",
+43
View File
@@ -148,6 +148,49 @@ class GiteaClient:
)
raise
async def raw_request(
self,
method: str,
endpoint: str,
*,
params: dict[str, Any] | None = None,
json_body: dict[str, Any] | None = None,
) -> Any:
"""Dispatch an arbitrary Gitea REST request for the ``gitea_request`` tool.
Only the method and normalized endpoint are audited; the request body is
never logged so secrets embedded in payloads are not persisted.
"""
correlation_id = self.audit.log_tool_invocation(
tool_name="gitea_request",
params={"method": method, "path": endpoint},
result_status="pending",
)
try:
result = await self._request(
method,
endpoint,
correlation_id=correlation_id,
params=params,
json_body=json_body,
)
self.audit.log_tool_invocation(
tool_name="gitea_request",
correlation_id=correlation_id,
result_status="success",
params={"method": method, "path": endpoint},
)
return result
except Exception as exc:
self.audit.log_tool_invocation(
tool_name="gitea_request",
correlation_id=correlation_id,
result_status="error",
params={"method": method, "path": endpoint},
error=str(exc),
)
raise
async def list_repositories(self) -> list[dict[str, Any]]:
"""List repositories visible to the authenticated user."""
correlation_id = self.audit.log_tool_invocation(
+32
View File
@@ -718,6 +718,38 @@ AVAILABLE_TOOLS: list[MCPTool] = [
},
write_operation=True,
),
_tool(
"gitea_request",
(
"Generic escape hatch that calls an arbitrary Gitea REST endpoint "
"(method + path). Prefer the dedicated tools; use this only for "
"endpoints they do not cover. Subject to policy, write-mode and the "
"sensitive-path denylist. Methods other than GET/HEAD are writes and "
"require write-mode plus a whitelisted repository."
),
{
"type": "object",
"properties": {
"method": {
"type": "string",
"enum": ["GET", "HEAD", "POST", "PUT", "PATCH", "DELETE"],
},
"path": {
"type": "string",
"description": "Gitea REST path, e.g. /repos/{owner}/{repo}/pulls/1/merge",
},
"query": {"type": "object", "description": "Optional query-string parameters"},
"body": {"type": "object", "description": "Optional JSON request body"},
},
"required": ["method", "path"],
"additionalProperties": False,
},
# write_operation is intentionally False: a static flag cannot describe a
# tool that is read OR write depending on the method. Setting it True
# would force the central write-mode gate on GETs and break reads. The
# handler is authoritative via its own per-method authorize() call.
write_operation=False,
),
]
+4
View File
@@ -60,6 +60,7 @@ from aegis_gitea_mcp.request_context import (
)
from aegis_gitea_mcp.security import sanitize_data
from aegis_gitea_mcp.tools.arguments import extract_repository, extract_target_path
from aegis_gitea_mcp.tools.raw_tools import raw_api_request_tool
from aegis_gitea_mcp.tools.read_tools import (
compare_refs_tool,
get_branch_tool,
@@ -420,6 +421,9 @@ TOOL_HANDLERS: dict[str, ToolHandler] = {
"create_branch": create_branch_tool,
"create_milestone": create_milestone_tool,
"edit_issue_comment": edit_issue_comment_tool,
# Generic raw API dispatch (escape hatch). Registered as a read tool so GETs
# work without write-mode; the handler authorizes writes per-method itself.
"gitea_request": raw_api_request_tool,
}
+154 -1
View File
@@ -2,7 +2,9 @@
from __future__ import annotations
from typing import Annotated, Literal
import re
from typing import Annotated, Any, Literal
from urllib.parse import urlsplit
from pydantic import (
AfterValidator,
@@ -10,6 +12,7 @@ from pydantic import (
BeforeValidator,
ConfigDict,
Field,
field_validator,
model_validator,
)
@@ -446,6 +449,137 @@ class RepoTopicsArgs(RepositoryArgs):
"""Arguments for list_repo_topics."""
# --- Raw API dispatch (gitea_request escape hatch) -------------------------
# HTTP methods the generic dispatch tool accepts. Everything outside GET/HEAD is
# treated as a write so the policy/write-mode gate applies.
RAW_API_METHODS = ("GET", "HEAD", "POST", "PUT", "PATCH", "DELETE")
_RAW_WRITE_METHODS = frozenset({"POST", "PUT", "PATCH", "DELETE"})
# Path segments/subpaths blocked for *every* method unless explicitly overridden
# via RAW_API_ALLOW_SENSITIVE. A GET on these already leaks credentials or
# privileged configuration, so they are denied independently of policy.yaml.
_RAW_SENSITIVE_SEGMENTS = frozenset({"admin", "tokens", "secrets", "hooks", "keys", "gpg_keys"})
_RAW_SENSITIVE_SUBPATHS = ("applications/oauth2", "actions/runners/registration-token")
# Endpoints under /repos/ that are not scoped to a single repository.
_RAW_CROSS_REPO_OWNERS = frozenset({"search", "issues"})
# Resources whose trailing segments form a file path target for policy checks.
_RAW_FILE_RESOURCES = frozenset({"contents", "raw", "media"})
def normalize_raw_endpoint(path: str) -> str:
"""Normalize a raw API path into an ``/api/v1``-prefixed endpoint.
Accepts a bare path (``/repos/o/r``), an already-prefixed path
(``/api/v1/repos/o/r``), or a full URL (the scheme/host and any query string
are stripped — the separate ``query`` argument carries query parameters).
Raises:
ValueError: When the path contains a ``..`` traversal segment.
"""
candidate = path.strip()
split = urlsplit(candidate)
# When a full URL is supplied, keep only its path component.
raw_path = split.path if (split.scheme or split.netloc) else candidate
# Drop any query/fragment a caller may have inlined into the path string.
raw_path = raw_path.split("?", 1)[0].split("#", 1)[0]
raw_path = raw_path.replace("\\", "/")
segments = [seg for seg in raw_path.split("/") if seg and seg != "."]
if any(seg == ".." for seg in segments):
raise ValueError("path must not contain '..' traversal segments")
rel_segments = segments[2:] if segments[:2] == ["api", "v1"] else segments
if not rel_segments:
return "/api/v1"
return "/api/v1/" + "/".join(rel_segments)
def _raw_relative_segments(endpoint: str) -> list[str]:
"""Return the endpoint segments after the ``/api/v1`` prefix."""
segments = [seg for seg in endpoint.split("/") if seg]
return segments[2:] if segments[:2] == ["api", "v1"] else segments
def raw_top_segment(endpoint: str) -> str:
"""Return the first path segment after ``/api/v1`` for coarse policy grouping."""
rel = _raw_relative_segments(endpoint)
return rel[0] if rel else ""
def raw_method_is_write(method: str) -> bool:
"""Return whether an HTTP method mutates state."""
return method.upper() in _RAW_WRITE_METHODS
def raw_is_sensitive(endpoint: str) -> bool:
"""Return whether an endpoint touches an admin/credential surface."""
rel = _raw_relative_segments(endpoint)
if any(seg in _RAW_SENSITIVE_SEGMENTS for seg in rel):
return True
joined = "/".join(rel)
return any(sub in joined for sub in _RAW_SENSITIVE_SUBPATHS)
def _raw_repo_segments(endpoint: str) -> list[str] | None:
"""Return ``[owner, repo, *rest]`` for a single-repository endpoint, else None."""
rel = _raw_relative_segments(endpoint)
if len(rel) < 3 or rel[0] != "repos":
return None
owner, repo = rel[1], rel[2]
if owner in _RAW_CROSS_REPO_OWNERS:
return None
if not (re.match(_REPO_PART_PATTERN, owner) and re.match(_REPO_PART_PATTERN, repo)):
return None
return [owner, repo, *rel[3:]]
def parse_raw_repository(endpoint: str) -> str | None:
"""Parse ``owner/repo`` from a repo-scoped endpoint; None for cross-repo paths."""
repo_segments = _raw_repo_segments(endpoint)
if repo_segments is None:
return None
return f"{repo_segments[0]}/{repo_segments[1]}"
def parse_raw_target_path(endpoint: str) -> str | None:
"""Parse a file-path target from ``contents``/``raw``/``media`` endpoints."""
repo_segments = _raw_repo_segments(endpoint)
if repo_segments is None or len(repo_segments) < 4:
return None
if repo_segments[2] not in _RAW_FILE_RESOURCES:
return None
file_path = "/".join(repo_segments[3:])
return file_path or None
class RawApiRequestArgs(StrictBaseModel):
"""Arguments for the generic ``gitea_request`` escape-hatch tool."""
method: Literal["GET", "HEAD", "POST", "PUT", "PATCH", "DELETE"] = Field(
..., description="HTTP method"
)
path: str = Field(..., min_length=1, max_length=2048, description="Gitea REST path")
query: dict[str, Any] | None = Field(
default=None, description="Optional query-string parameters"
)
body: dict[str, Any] | None = Field(default=None, description="Optional JSON request body")
@field_validator("method", mode="before")
@classmethod
def _normalize_method(cls, value: object) -> object:
"""Uppercase the method before enum validation so 'get' is accepted."""
if isinstance(value, str):
return value.strip().upper()
return value
@model_validator(mode="after")
def _validate_path(self) -> RawApiRequestArgs:
"""Reject path traversal up front so the handler sees a clean endpoint."""
normalize_raw_endpoint(self.path)
return self
def extract_repository(arguments: dict[str, object]) -> str | None:
"""Extract `owner/repo` from raw argument mapping.
@@ -459,6 +593,16 @@ def extract_repository(arguments: dict[str, object]) -> str | None:
repo = arguments.get("repo")
if isinstance(owner, str) and isinstance(repo, str) and owner and repo:
return f"{owner}/{repo}"
# Raw API dispatch: derive the repository from the request path so the central
# policy gate and the service-PAT per-user permission check evaluate the real
# target instead of treating every raw call as repo-less.
path = arguments.get("path")
method = arguments.get("method")
if isinstance(path, str) and isinstance(method, str):
try:
return parse_raw_repository(normalize_raw_endpoint(path))
except ValueError:
return None
return None
@@ -467,4 +611,13 @@ def extract_target_path(arguments: dict[str, object]) -> str | None:
filepath = arguments.get("filepath")
if isinstance(filepath, str) and filepath:
return filepath
# Raw API dispatch: expose the file path embedded in contents/raw/media
# endpoints so repository path allow/deny rules still apply to raw calls.
path = arguments.get("path")
method = arguments.get("method")
if isinstance(path, str) and isinstance(method, str):
try:
return parse_raw_target_path(normalize_raw_endpoint(path))
except ValueError:
return None
return None
+129
View File
@@ -0,0 +1,129 @@
"""Generic raw Gitea REST dispatch tool (escape hatch).
``gitea_request`` exposes the long tail of the Gitea API that the curated, typed
tools do not cover. A single tool surface would normally collapse the
granularity of ``policy.yaml``, so this handler re-derives a coarse virtual tool
name (``gitea_request:<METHOD>:<top-segment>``) and the target repository/path
from each request and runs them back through the policy engine. That reuses the
existing write-mode + write-whitelist enforcement and keeps per-method/per-repo
policy control intact behind the single tool.
Two layers of authorization apply:
* The central dispatch gate in ``server.py`` allows/denies the registered
``gitea_request`` name and, in service-PAT mode, verifies the signed-in user's
permission on the parsed repository.
* This handler then authorizes the fine-grained virtual tool name and enforces a
built-in admin/credential denylist that ``policy.yaml`` cannot re-open.
"""
from __future__ import annotations
import json
from typing import Any
from fastapi import HTTPException
from aegis_gitea_mcp.audit import get_audit_logger
from aegis_gitea_mcp.config import get_settings
from aegis_gitea_mcp.gitea_client import (
GiteaAuthenticationError,
GiteaAuthorizationError,
GiteaClient,
GiteaError,
)
from aegis_gitea_mcp.policy import get_policy_engine
from aegis_gitea_mcp.response_limits import limit_items, limit_text
from aegis_gitea_mcp.tools.arguments import (
RawApiRequestArgs,
normalize_raw_endpoint,
parse_raw_repository,
parse_raw_target_path,
raw_is_sensitive,
raw_method_is_write,
raw_top_segment,
)
def _bound_response(data: Any) -> dict[str, Any]:
"""Bound a raw response into stable, size-limited envelope fields."""
if isinstance(data, list):
bounded, omitted = limit_items(list(data))
return {"data": bounded, "count": len(bounded), "omitted": omitted}
if isinstance(data, dict):
serialized = json.dumps(data, ensure_ascii=False, default=str)
capped = limit_text(serialized)
if len(capped) < len(serialized):
# Oversized dict: return a truncated JSON string instead of the object.
return {"data": capped, "truncated": True}
return {"data": data, "truncated": False}
if isinstance(data, str):
return {"data": limit_text(data)}
return {"data": data}
async def raw_api_request_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
"""Dispatch an arbitrary Gitea REST endpoint subject to policy and denylists."""
settings = get_settings()
audit = get_audit_logger()
if not settings.raw_api_enabled:
raise HTTPException(
status_code=403,
detail="Raw API dispatch is disabled (set RAW_API_ENABLED=true to enable).",
)
parsed = RawApiRequestArgs.model_validate(arguments)
method = parsed.method
endpoint = normalize_raw_endpoint(parsed.path)
is_write = raw_method_is_write(method)
# Admin/credential denylist applies to every method and cannot be re-opened
# from policy.yaml — only RAW_API_ALLOW_SENSITIVE overrides it.
if raw_is_sensitive(endpoint) and not settings.raw_api_allow_sensitive:
audit.log_access_denied(tool_name="gitea_request", reason="raw_sensitive_path_denied")
raise HTTPException(
status_code=403,
detail=(
"Endpoint targets an admin/credential surface blocked by the raw-API "
"sensitive-path denylist."
),
)
repository = parse_raw_repository(endpoint)
target_path = parse_raw_target_path(endpoint)
# Coarse, stable virtual tool name so policy.yaml can allow/deny by method +
# top-level path segment (policy matches tool names by exact set membership).
policy_tool_name = f"gitea_request:{method}:{raw_top_segment(endpoint)}"
decision = get_policy_engine().authorize(
tool_name=policy_tool_name,
is_write=is_write,
repository=repository,
target_path=target_path,
)
if not decision.allowed:
audit.log_access_denied(
tool_name=policy_tool_name,
repository=repository,
reason=decision.reason,
)
raise HTTPException(status_code=403, detail=f"Policy denied raw request: {decision.reason}")
try:
data = await gitea.raw_request(method, endpoint, params=parsed.query, json_body=parsed.body)
except (GiteaAuthenticationError, GiteaAuthorizationError):
# Let auth/authz failures surface so the server returns actionable
# re-authorization guidance instead of a generic internal error.
raise
except GiteaError as exc:
raise RuntimeError(f"Raw API request failed: {exc}") from exc
envelope: dict[str, Any] = {
"method": method,
"path": endpoint,
"write": is_write,
"repository": repository,
}
envelope.update(_bound_response(data))
return envelope
+321
View File
@@ -0,0 +1,321 @@
"""Tests for the generic gitea_request raw API dispatch tool."""
from __future__ import annotations
from pathlib import Path
from typing import Any
import pytest
from fastapi import HTTPException
from pydantic import ValidationError
from aegis_gitea_mcp.config import reset_settings
from aegis_gitea_mcp.tools.arguments import (
extract_repository,
extract_target_path,
normalize_raw_endpoint,
parse_raw_repository,
parse_raw_target_path,
raw_is_sensitive,
raw_top_segment,
)
from aegis_gitea_mcp.tools.raw_tools import raw_api_request_tool
@pytest.fixture
def raw_env(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""Minimal API-key-mode settings with policy that allows reads, denies writes."""
reset_settings()
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
monkeypatch.setenv("GITEA_TOKEN", "test-token")
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
monkeypatch.setenv("ENVIRONMENT", "test")
# Point at a non-existent policy file so the default config applies
# (read: allow, write: deny) and tests do not depend on the repo policy.yaml.
monkeypatch.setenv("POLICY_FILE_PATH", str(tmp_path / "missing-policy.yaml"))
class StubRawGitea:
"""Stub Gitea client capturing raw_request calls."""
def __init__(self, response: Any = None) -> None:
self._response: Any = {"ok": True} if response is None else response
self.calls: list[dict[str, Any]] = []
async def raw_request(
self,
method: str,
endpoint: str,
*,
params: dict[str, Any] | None = None,
json_body: dict[str, Any] | None = None,
) -> Any:
self.calls.append(
{"method": method, "endpoint": endpoint, "params": params, "json_body": json_body}
)
return self._response
# --- Handler behavior ------------------------------------------------------
async def test_get_repo_endpoint_allowed_and_parses_repository(raw_env: None) -> None:
"""A GET on a repo endpoint is allowed and parses owner/repo from the path."""
stub = StubRawGitea({"number": 1})
result = await raw_api_request_tool(stub, {"method": "GET", "path": "/repos/acme/app/pulls/1"})
assert result["method"] == "GET"
assert result["path"] == "/api/v1/repos/acme/app/pulls/1"
assert result["write"] is False
assert result["repository"] == "acme/app"
assert result["data"] == {"number": 1}
assert stub.calls[0]["endpoint"] == "/api/v1/repos/acme/app/pulls/1"
async def test_lowercase_method_is_normalized(raw_env: None) -> None:
"""A lowercase method is uppercased and accepted."""
stub = StubRawGitea([{"id": 1}])
result = await raw_api_request_tool(stub, {"method": "get", "path": "/repos/acme/app/issues"})
assert result["method"] == "GET"
assert result["count"] == 1
async def test_delete_denied_when_write_mode_off(raw_env: None) -> None:
"""A write method is denied (no network call) while write-mode is disabled."""
stub = StubRawGitea()
with pytest.raises(HTTPException) as exc_info:
await raw_api_request_tool(stub, {"method": "DELETE", "path": "/repos/acme/app/issues/1"})
assert exc_info.value.status_code == 403
assert "write mode is disabled" in str(exc_info.value.detail)
assert stub.calls == []
async def test_write_allowed_with_write_mode_and_whitelist(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
"""A write succeeds only when write-mode is on, the repo is whitelisted, and policy allows."""
policy_file = tmp_path / "policy.yaml"
policy_file.write_text("defaults:\n read: allow\n write: allow\n", encoding="utf-8")
reset_settings()
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
monkeypatch.setenv("GITEA_TOKEN", "test-token")
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
monkeypatch.setenv("ENVIRONMENT", "test")
monkeypatch.setenv("POLICY_FILE_PATH", str(policy_file))
monkeypatch.setenv("WRITE_MODE", "true")
monkeypatch.setenv("WRITE_REPOSITORY_WHITELIST", "acme/app")
stub = StubRawGitea({"merged": True})
result = await raw_api_request_tool(
stub,
{"method": "PUT", "path": "/repos/acme/app/pulls/1/merge", "body": {"Do": "merge"}},
)
assert result["write"] is True
assert result["repository"] == "acme/app"
assert stub.calls[0]["json_body"] == {"Do": "merge"}
async def test_write_denied_for_repo_outside_whitelist(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
"""A write on a repo not in the whitelist is denied even with write-mode on."""
policy_file = tmp_path / "policy.yaml"
policy_file.write_text("defaults:\n read: allow\n write: allow\n", encoding="utf-8")
reset_settings()
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
monkeypatch.setenv("GITEA_TOKEN", "test-token")
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
monkeypatch.setenv("ENVIRONMENT", "test")
monkeypatch.setenv("POLICY_FILE_PATH", str(policy_file))
monkeypatch.setenv("WRITE_MODE", "true")
monkeypatch.setenv("WRITE_REPOSITORY_WHITELIST", "acme/other")
stub = StubRawGitea()
with pytest.raises(HTTPException) as exc_info:
await raw_api_request_tool(stub, {"method": "POST", "path": "/repos/acme/app/issues"})
assert exc_info.value.status_code == 403
assert "whitelist" in str(exc_info.value.detail)
assert stub.calls == []
async def test_non_repository_write_denied(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""A write that targets no repository is denied (secure default)."""
policy_file = tmp_path / "policy.yaml"
policy_file.write_text("defaults:\n read: allow\n write: allow\n", encoding="utf-8")
reset_settings()
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
monkeypatch.setenv("GITEA_TOKEN", "test-token")
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
monkeypatch.setenv("ENVIRONMENT", "test")
monkeypatch.setenv("POLICY_FILE_PATH", str(policy_file))
monkeypatch.setenv("WRITE_MODE", "true")
monkeypatch.setenv("WRITE_REPOSITORY_WHITELIST", "acme/app")
stub = StubRawGitea()
with pytest.raises(HTTPException) as exc_info:
await raw_api_request_tool(stub, {"method": "POST", "path": "/user/repos"})
assert exc_info.value.status_code == 403
assert "repository target" in str(exc_info.value.detail)
assert stub.calls == []
@pytest.mark.parametrize(
"path",
["/admin/users", "/users/bob/tokens", "/repos/acme/app/hooks", "/user/keys"],
)
async def test_sensitive_paths_denied_on_get(raw_env: None, path: str) -> None:
"""Admin/credential surfaces are denied for every method, including GET."""
stub = StubRawGitea()
with pytest.raises(HTTPException) as exc_info:
await raw_api_request_tool(stub, {"method": "GET", "path": path})
assert exc_info.value.status_code == 403
assert "sensitive-path denylist" in str(exc_info.value.detail)
assert stub.calls == []
async def test_sensitive_path_allowed_with_override(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
"""RAW_API_ALLOW_SENSITIVE bypasses the admin/credential denylist."""
reset_settings()
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
monkeypatch.setenv("GITEA_TOKEN", "test-token")
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
monkeypatch.setenv("ENVIRONMENT", "test")
monkeypatch.setenv("POLICY_FILE_PATH", str(tmp_path / "missing.yaml"))
monkeypatch.setenv("RAW_API_ALLOW_SENSITIVE", "true")
stub = StubRawGitea([{"id": 1}])
result = await raw_api_request_tool(stub, {"method": "GET", "path": "/admin/users"})
assert result["data"] == [{"id": 1}]
assert stub.calls[0]["endpoint"] == "/api/v1/admin/users"
async def test_cross_repo_search_not_treated_as_repository(raw_env: None) -> None:
"""/repos/issues/search is a cross-repo endpoint, so repository is None."""
stub = StubRawGitea([{"id": 1}])
result = await raw_api_request_tool(
stub, {"method": "GET", "path": "/repos/issues/search", "query": {"q": "bug"}}
)
assert result["repository"] is None
assert result["count"] == 1
assert stub.calls[0]["params"] == {"q": "bug"}
async def test_unknown_method_rejected_before_network(raw_env: None) -> None:
"""An unknown HTTP method is rejected during validation before any network call."""
stub = StubRawGitea()
with pytest.raises(ValidationError):
await raw_api_request_tool(stub, {"method": "OPTIONS", "path": "/repos/acme/app"})
assert stub.calls == []
async def test_path_traversal_rejected(raw_env: None) -> None:
"""A path containing '..' is rejected during validation."""
stub = StubRawGitea()
with pytest.raises(ValidationError):
await raw_api_request_tool(
stub, {"method": "GET", "path": "/repos/acme/app/../../admin/users"}
)
assert stub.calls == []
async def test_full_url_is_reduced_to_path(raw_env: None) -> None:
"""A full URL is reduced to just the API path."""
stub = StubRawGitea({"name": "app"})
result = await raw_api_request_tool(
stub,
{
"method": "GET",
"path": "https://gitea.example.com/api/v1/repos/acme/app/contents/src/app.py?ref=main",
},
)
assert result["path"] == "/api/v1/repos/acme/app/contents/src/app.py"
assert result["repository"] == "acme/app"
async def test_raw_api_disabled(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""The killswitch disables every dispatch."""
reset_settings()
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
monkeypatch.setenv("GITEA_TOKEN", "test-token")
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
monkeypatch.setenv("ENVIRONMENT", "test")
monkeypatch.setenv("POLICY_FILE_PATH", str(tmp_path / "missing.yaml"))
monkeypatch.setenv("RAW_API_ENABLED", "false")
stub = StubRawGitea()
with pytest.raises(HTTPException) as exc_info:
await raw_api_request_tool(stub, {"method": "GET", "path": "/repos/acme/app"})
assert exc_info.value.status_code == 403
assert "disabled" in str(exc_info.value.detail)
assert stub.calls == []
async def test_large_dict_response_is_truncated(raw_env: None) -> None:
"""An oversized object response is returned as a truncated JSON string."""
big = {"blob": "x" * 50_000}
stub = StubRawGitea(big)
result = await raw_api_request_tool(stub, {"method": "GET", "path": "/repos/acme/app"})
assert result["truncated"] is True
assert isinstance(result["data"], str)
# --- Path parsing helpers --------------------------------------------------
@pytest.mark.parametrize(
("path", "expected"),
[
("/repos/acme/app", "/api/v1/repos/acme/app"),
("repos/acme/app", "/api/v1/repos/acme/app"),
("/api/v1/repos/acme/app", "/api/v1/repos/acme/app"),
("/", "/api/v1"),
("", "/api/v1"),
],
)
def test_normalize_raw_endpoint(path: str, expected: str) -> None:
assert normalize_raw_endpoint(path) == expected
def test_normalize_raw_endpoint_rejects_traversal() -> None:
with pytest.raises(ValueError):
normalize_raw_endpoint("/repos/acme/../admin")
def test_parse_raw_repository_variants() -> None:
assert parse_raw_repository("/api/v1/repos/acme/app/pulls/1") == "acme/app"
assert parse_raw_repository("/api/v1/repos/search") is None
assert parse_raw_repository("/api/v1/repos/issues/search") is None
assert parse_raw_repository("/api/v1/user/repos") is None
def test_parse_raw_target_path() -> None:
assert parse_raw_target_path("/api/v1/repos/acme/app/contents/src/app.py") == "src/app.py"
assert parse_raw_target_path("/api/v1/repos/acme/app/raw/README.md") == "README.md"
assert parse_raw_target_path("/api/v1/repos/acme/app/pulls/1") is None
def test_raw_top_segment_and_sensitivity() -> None:
assert raw_top_segment("/api/v1/repos/acme/app") == "repos"
assert raw_top_segment("/api/v1") == ""
assert raw_is_sensitive("/api/v1/repos/acme/app/hooks") is True
assert raw_is_sensitive("/api/v1/user/applications/oauth2") is True
assert raw_is_sensitive("/api/v1/repos/acme/app/pulls") is False
def test_extractors_are_raw_aware() -> None:
raw_args = {"method": "GET", "path": "/repos/acme/app/contents/src/app.py"}
assert extract_repository(raw_args) == "acme/app"
assert extract_target_path(raw_args) == "src/app.py"
# Malformed raw path must not raise from the extractors.
assert extract_repository({"method": "GET", "path": "/repos/acme/../x"}) is None
assert extract_target_path({"method": "GET", "path": "/repos/acme/../x"}) is None