feat(raw-api): add gitea_request schema, path parsing, client dispatch and handler

Adds the RawApiRequestArgs schema (extra=forbid), raw path normalization/
parsing helpers, a GiteaClient.raw_request that audits method+path only (never
the body), and the raw_api_request_tool handler. The handler derives a coarse
virtual tool name (gitea_request:METHOD:topsegment) plus repository/target_path
from the path and runs them back through the policy engine, enforces an
admin/credential sensitive-path denylist, and bounds responses. Two config
flags gate it: RAW_API_ENABLED (killswitch) and RAW_API_ALLOW_SENSITIVE.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-26 12:26:26 +02:00
parent 227122263b
commit 2844c42ec8
4 changed files with 339 additions and 1 deletions
+13
View File
@@ -211,6 +211,19 @@ class Settings(BaseSettings):
"Disabled by default." "Disabled by default."
), ),
) )
# Raw API dispatch (gitea_request escape hatch)
raw_api_enabled: bool = Field(
default=True,
description="Enable the generic gitea_request raw API dispatch tool",
)
raw_api_allow_sensitive: bool = Field(
default=False,
description=(
"Allow gitea_request to reach admin/credential endpoints "
"(/admin, *tokens*, *secrets*, *hooks*, *keys*, applications/oauth2, "
"runner registration tokens). Disabled by default."
),
)
automation_enabled: bool = Field( automation_enabled: bool = Field(
default=False, default=False,
description="Enable automation endpoints and workflows", description="Enable automation endpoints and workflows",
+43
View File
@@ -148,6 +148,49 @@ class GiteaClient:
) )
raise raise
async def raw_request(
self,
method: str,
endpoint: str,
*,
params: dict[str, Any] | None = None,
json_body: dict[str, Any] | None = None,
) -> Any:
"""Dispatch an arbitrary Gitea REST request for the ``gitea_request`` tool.
Only the method and normalized endpoint are audited; the request body is
never logged so secrets embedded in payloads are not persisted.
"""
correlation_id = self.audit.log_tool_invocation(
tool_name="gitea_request",
params={"method": method, "path": endpoint},
result_status="pending",
)
try:
result = await self._request(
method,
endpoint,
correlation_id=correlation_id,
params=params,
json_body=json_body,
)
self.audit.log_tool_invocation(
tool_name="gitea_request",
correlation_id=correlation_id,
result_status="success",
params={"method": method, "path": endpoint},
)
return result
except Exception as exc:
self.audit.log_tool_invocation(
tool_name="gitea_request",
correlation_id=correlation_id,
result_status="error",
params={"method": method, "path": endpoint},
error=str(exc),
)
raise
async def list_repositories(self) -> list[dict[str, Any]]: async def list_repositories(self) -> list[dict[str, Any]]:
"""List repositories visible to the authenticated user.""" """List repositories visible to the authenticated user."""
correlation_id = self.audit.log_tool_invocation( correlation_id = self.audit.log_tool_invocation(
+154 -1
View File
@@ -2,7 +2,9 @@
from __future__ import annotations from __future__ import annotations
from typing import Annotated, Literal import re
from typing import Annotated, Any, Literal
from urllib.parse import urlsplit
from pydantic import ( from pydantic import (
AfterValidator, AfterValidator,
@@ -10,6 +12,7 @@ from pydantic import (
BeforeValidator, BeforeValidator,
ConfigDict, ConfigDict,
Field, Field,
field_validator,
model_validator, model_validator,
) )
@@ -446,6 +449,137 @@ class RepoTopicsArgs(RepositoryArgs):
"""Arguments for list_repo_topics.""" """Arguments for list_repo_topics."""
# --- Raw API dispatch (gitea_request escape hatch) -------------------------
# HTTP methods the generic dispatch tool accepts. Everything outside GET/HEAD is
# treated as a write so the policy/write-mode gate applies.
RAW_API_METHODS = ("GET", "HEAD", "POST", "PUT", "PATCH", "DELETE")
_RAW_WRITE_METHODS = frozenset({"POST", "PUT", "PATCH", "DELETE"})
# Path segments/subpaths blocked for *every* method unless explicitly overridden
# via RAW_API_ALLOW_SENSITIVE. A GET on these already leaks credentials or
# privileged configuration, so they are denied independently of policy.yaml.
_RAW_SENSITIVE_SEGMENTS = frozenset({"admin", "tokens", "secrets", "hooks", "keys", "gpg_keys"})
_RAW_SENSITIVE_SUBPATHS = ("applications/oauth2", "actions/runners/registration-token")
# Endpoints under /repos/ that are not scoped to a single repository.
_RAW_CROSS_REPO_OWNERS = frozenset({"search", "issues"})
# Resources whose trailing segments form a file path target for policy checks.
_RAW_FILE_RESOURCES = frozenset({"contents", "raw", "media"})
def normalize_raw_endpoint(path: str) -> str:
"""Normalize a raw API path into an ``/api/v1``-prefixed endpoint.
Accepts a bare path (``/repos/o/r``), an already-prefixed path
(``/api/v1/repos/o/r``), or a full URL (the scheme/host and any query string
are stripped — the separate ``query`` argument carries query parameters).
Raises:
ValueError: When the path contains a ``..`` traversal segment.
"""
candidate = path.strip()
split = urlsplit(candidate)
# When a full URL is supplied, keep only its path component.
raw_path = split.path if (split.scheme or split.netloc) else candidate
# Drop any query/fragment a caller may have inlined into the path string.
raw_path = raw_path.split("?", 1)[0].split("#", 1)[0]
raw_path = raw_path.replace("\\", "/")
segments = [seg for seg in raw_path.split("/") if seg and seg != "."]
if any(seg == ".." for seg in segments):
raise ValueError("path must not contain '..' traversal segments")
rel_segments = segments[2:] if segments[:2] == ["api", "v1"] else segments
if not rel_segments:
return "/api/v1"
return "/api/v1/" + "/".join(rel_segments)
def _raw_relative_segments(endpoint: str) -> list[str]:
"""Return the endpoint segments after the ``/api/v1`` prefix."""
segments = [seg for seg in endpoint.split("/") if seg]
return segments[2:] if segments[:2] == ["api", "v1"] else segments
def raw_top_segment(endpoint: str) -> str:
"""Return the first path segment after ``/api/v1`` for coarse policy grouping."""
rel = _raw_relative_segments(endpoint)
return rel[0] if rel else ""
def raw_method_is_write(method: str) -> bool:
"""Return whether an HTTP method mutates state."""
return method.upper() in _RAW_WRITE_METHODS
def raw_is_sensitive(endpoint: str) -> bool:
"""Return whether an endpoint touches an admin/credential surface."""
rel = _raw_relative_segments(endpoint)
if any(seg in _RAW_SENSITIVE_SEGMENTS for seg in rel):
return True
joined = "/".join(rel)
return any(sub in joined for sub in _RAW_SENSITIVE_SUBPATHS)
def _raw_repo_segments(endpoint: str) -> list[str] | None:
"""Return ``[owner, repo, *rest]`` for a single-repository endpoint, else None."""
rel = _raw_relative_segments(endpoint)
if len(rel) < 3 or rel[0] != "repos":
return None
owner, repo = rel[1], rel[2]
if owner in _RAW_CROSS_REPO_OWNERS:
return None
if not (re.match(_REPO_PART_PATTERN, owner) and re.match(_REPO_PART_PATTERN, repo)):
return None
return [owner, repo, *rel[3:]]
def parse_raw_repository(endpoint: str) -> str | None:
"""Parse ``owner/repo`` from a repo-scoped endpoint; None for cross-repo paths."""
repo_segments = _raw_repo_segments(endpoint)
if repo_segments is None:
return None
return f"{repo_segments[0]}/{repo_segments[1]}"
def parse_raw_target_path(endpoint: str) -> str | None:
"""Parse a file-path target from ``contents``/``raw``/``media`` endpoints."""
repo_segments = _raw_repo_segments(endpoint)
if repo_segments is None or len(repo_segments) < 4:
return None
if repo_segments[2] not in _RAW_FILE_RESOURCES:
return None
file_path = "/".join(repo_segments[3:])
return file_path or None
class RawApiRequestArgs(StrictBaseModel):
"""Arguments for the generic ``gitea_request`` escape-hatch tool."""
method: Literal["GET", "HEAD", "POST", "PUT", "PATCH", "DELETE"] = Field(
..., description="HTTP method"
)
path: str = Field(..., min_length=1, max_length=2048, description="Gitea REST path")
query: dict[str, Any] | None = Field(
default=None, description="Optional query-string parameters"
)
body: dict[str, Any] | None = Field(default=None, description="Optional JSON request body")
@field_validator("method", mode="before")
@classmethod
def _normalize_method(cls, value: object) -> object:
"""Uppercase the method before enum validation so 'get' is accepted."""
if isinstance(value, str):
return value.strip().upper()
return value
@model_validator(mode="after")
def _validate_path(self) -> RawApiRequestArgs:
"""Reject path traversal up front so the handler sees a clean endpoint."""
normalize_raw_endpoint(self.path)
return self
def extract_repository(arguments: dict[str, object]) -> str | None: def extract_repository(arguments: dict[str, object]) -> str | None:
"""Extract `owner/repo` from raw argument mapping. """Extract `owner/repo` from raw argument mapping.
@@ -459,6 +593,16 @@ def extract_repository(arguments: dict[str, object]) -> str | None:
repo = arguments.get("repo") repo = arguments.get("repo")
if isinstance(owner, str) and isinstance(repo, str) and owner and repo: if isinstance(owner, str) and isinstance(repo, str) and owner and repo:
return f"{owner}/{repo}" return f"{owner}/{repo}"
# Raw API dispatch: derive the repository from the request path so the central
# policy gate and the service-PAT per-user permission check evaluate the real
# target instead of treating every raw call as repo-less.
path = arguments.get("path")
method = arguments.get("method")
if isinstance(path, str) and isinstance(method, str):
try:
return parse_raw_repository(normalize_raw_endpoint(path))
except ValueError:
return None
return None return None
@@ -467,4 +611,13 @@ def extract_target_path(arguments: dict[str, object]) -> str | None:
filepath = arguments.get("filepath") filepath = arguments.get("filepath")
if isinstance(filepath, str) and filepath: if isinstance(filepath, str) and filepath:
return filepath return filepath
# Raw API dispatch: expose the file path embedded in contents/raw/media
# endpoints so repository path allow/deny rules still apply to raw calls.
path = arguments.get("path")
method = arguments.get("method")
if isinstance(path, str) and isinstance(method, str):
try:
return parse_raw_target_path(normalize_raw_endpoint(path))
except ValueError:
return None
return None return None
+129
View File
@@ -0,0 +1,129 @@
"""Generic raw Gitea REST dispatch tool (escape hatch).
``gitea_request`` exposes the long tail of the Gitea API that the curated, typed
tools do not cover. A single tool surface would normally collapse the
granularity of ``policy.yaml``, so this handler re-derives a coarse virtual tool
name (``gitea_request:<METHOD>:<top-segment>``) and the target repository/path
from each request and runs them back through the policy engine. That reuses the
existing write-mode + write-whitelist enforcement and keeps per-method/per-repo
policy control intact behind the single tool.
Two layers of authorization apply:
* The central dispatch gate in ``server.py`` allows/denies the registered
``gitea_request`` name and, in service-PAT mode, verifies the signed-in user's
permission on the parsed repository.
* This handler then authorizes the fine-grained virtual tool name and enforces a
built-in admin/credential denylist that ``policy.yaml`` cannot re-open.
"""
from __future__ import annotations
import json
from typing import Any
from fastapi import HTTPException
from aegis_gitea_mcp.audit import get_audit_logger
from aegis_gitea_mcp.config import get_settings
from aegis_gitea_mcp.gitea_client import (
GiteaAuthenticationError,
GiteaAuthorizationError,
GiteaClient,
GiteaError,
)
from aegis_gitea_mcp.policy import get_policy_engine
from aegis_gitea_mcp.response_limits import limit_items, limit_text
from aegis_gitea_mcp.tools.arguments import (
RawApiRequestArgs,
normalize_raw_endpoint,
parse_raw_repository,
parse_raw_target_path,
raw_is_sensitive,
raw_method_is_write,
raw_top_segment,
)
def _bound_response(data: Any) -> dict[str, Any]:
"""Bound a raw response into stable, size-limited envelope fields."""
if isinstance(data, list):
bounded, omitted = limit_items(list(data))
return {"data": bounded, "count": len(bounded), "omitted": omitted}
if isinstance(data, dict):
serialized = json.dumps(data, ensure_ascii=False, default=str)
capped = limit_text(serialized)
if len(capped) < len(serialized):
# Oversized dict: return a truncated JSON string instead of the object.
return {"data": capped, "truncated": True}
return {"data": data, "truncated": False}
if isinstance(data, str):
return {"data": limit_text(data)}
return {"data": data}
async def raw_api_request_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
"""Dispatch an arbitrary Gitea REST endpoint subject to policy and denylists."""
settings = get_settings()
audit = get_audit_logger()
if not settings.raw_api_enabled:
raise HTTPException(
status_code=403,
detail="Raw API dispatch is disabled (set RAW_API_ENABLED=true to enable).",
)
parsed = RawApiRequestArgs.model_validate(arguments)
method = parsed.method
endpoint = normalize_raw_endpoint(parsed.path)
is_write = raw_method_is_write(method)
# Admin/credential denylist applies to every method and cannot be re-opened
# from policy.yaml — only RAW_API_ALLOW_SENSITIVE overrides it.
if raw_is_sensitive(endpoint) and not settings.raw_api_allow_sensitive:
audit.log_access_denied(tool_name="gitea_request", reason="raw_sensitive_path_denied")
raise HTTPException(
status_code=403,
detail=(
"Endpoint targets an admin/credential surface blocked by the raw-API "
"sensitive-path denylist."
),
)
repository = parse_raw_repository(endpoint)
target_path = parse_raw_target_path(endpoint)
# Coarse, stable virtual tool name so policy.yaml can allow/deny by method +
# top-level path segment (policy matches tool names by exact set membership).
policy_tool_name = f"gitea_request:{method}:{raw_top_segment(endpoint)}"
decision = get_policy_engine().authorize(
tool_name=policy_tool_name,
is_write=is_write,
repository=repository,
target_path=target_path,
)
if not decision.allowed:
audit.log_access_denied(
tool_name=policy_tool_name,
repository=repository,
reason=decision.reason,
)
raise HTTPException(status_code=403, detail=f"Policy denied raw request: {decision.reason}")
try:
data = await gitea.raw_request(method, endpoint, params=parsed.query, json_body=parsed.body)
except (GiteaAuthenticationError, GiteaAuthorizationError):
# Let auth/authz failures surface so the server returns actionable
# re-authorization guidance instead of a generic internal error.
raise
except GiteaError as exc:
raise RuntimeError(f"Raw API request failed: {exc}") from exc
envelope: dict[str, Any] = {
"method": method,
"path": endpoint,
"write": is_write,
"repository": repository,
}
envelope.update(_bound_response(data))
return envelope