From 2844c42ec82843d89d5079e44a47302361b9404b Mon Sep 17 00:00:00 2001 From: Latte Date: Fri, 26 Jun 2026 12:26:26 +0200 Subject: [PATCH] feat(raw-api): add gitea_request schema, path parsing, client dispatch and handler Adds the RawApiRequestArgs schema (extra=forbid), raw path normalization/ parsing helpers, a GiteaClient.raw_request that audits method+path only (never the body), and the raw_api_request_tool handler. The handler derives a coarse virtual tool name (gitea_request:METHOD:topsegment) plus repository/target_path from the path and runs them back through the policy engine, enforces an admin/credential sensitive-path denylist, and bounds responses. Two config flags gate it: RAW_API_ENABLED (killswitch) and RAW_API_ALLOW_SENSITIVE. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/aegis_gitea_mcp/config.py | 13 +++ src/aegis_gitea_mcp/gitea_client.py | 43 +++++++ src/aegis_gitea_mcp/tools/arguments.py | 155 ++++++++++++++++++++++++- src/aegis_gitea_mcp/tools/raw_tools.py | 129 ++++++++++++++++++++ 4 files changed, 339 insertions(+), 1 deletion(-) create mode 100644 src/aegis_gitea_mcp/tools/raw_tools.py diff --git a/src/aegis_gitea_mcp/config.py b/src/aegis_gitea_mcp/config.py index 4bb54ec..eb12736 100644 --- a/src/aegis_gitea_mcp/config.py +++ b/src/aegis_gitea_mcp/config.py @@ -211,6 +211,19 @@ class Settings(BaseSettings): "Disabled by default." ), ) + # Raw API dispatch (gitea_request escape hatch) + raw_api_enabled: bool = Field( + default=True, + description="Enable the generic gitea_request raw API dispatch tool", + ) + raw_api_allow_sensitive: bool = Field( + default=False, + description=( + "Allow gitea_request to reach admin/credential endpoints " + "(/admin, *tokens*, *secrets*, *hooks*, *keys*, applications/oauth2, " + "runner registration tokens). Disabled by default." + ), + ) automation_enabled: bool = Field( default=False, description="Enable automation endpoints and workflows", diff --git a/src/aegis_gitea_mcp/gitea_client.py b/src/aegis_gitea_mcp/gitea_client.py index d4e3f6c..84d2eed 100644 --- a/src/aegis_gitea_mcp/gitea_client.py +++ b/src/aegis_gitea_mcp/gitea_client.py @@ -148,6 +148,49 @@ class GiteaClient: ) raise + async def raw_request( + self, + method: str, + endpoint: str, + *, + params: dict[str, Any] | None = None, + json_body: dict[str, Any] | None = None, + ) -> Any: + """Dispatch an arbitrary Gitea REST request for the ``gitea_request`` tool. + + Only the method and normalized endpoint are audited; the request body is + never logged so secrets embedded in payloads are not persisted. + """ + correlation_id = self.audit.log_tool_invocation( + tool_name="gitea_request", + params={"method": method, "path": endpoint}, + result_status="pending", + ) + try: + result = await self._request( + method, + endpoint, + correlation_id=correlation_id, + params=params, + json_body=json_body, + ) + self.audit.log_tool_invocation( + tool_name="gitea_request", + correlation_id=correlation_id, + result_status="success", + params={"method": method, "path": endpoint}, + ) + return result + except Exception as exc: + self.audit.log_tool_invocation( + tool_name="gitea_request", + correlation_id=correlation_id, + result_status="error", + params={"method": method, "path": endpoint}, + error=str(exc), + ) + raise + async def list_repositories(self) -> list[dict[str, Any]]: """List repositories visible to the authenticated user.""" correlation_id = self.audit.log_tool_invocation( diff --git a/src/aegis_gitea_mcp/tools/arguments.py b/src/aegis_gitea_mcp/tools/arguments.py index 4d934fc..f201b8d 100644 --- a/src/aegis_gitea_mcp/tools/arguments.py +++ b/src/aegis_gitea_mcp/tools/arguments.py @@ -2,7 +2,9 @@ from __future__ import annotations -from typing import Annotated, Literal +import re +from typing import Annotated, Any, Literal +from urllib.parse import urlsplit from pydantic import ( AfterValidator, @@ -10,6 +12,7 @@ from pydantic import ( BeforeValidator, ConfigDict, Field, + field_validator, model_validator, ) @@ -446,6 +449,137 @@ class RepoTopicsArgs(RepositoryArgs): """Arguments for list_repo_topics.""" +# --- Raw API dispatch (gitea_request escape hatch) ------------------------- + +# HTTP methods the generic dispatch tool accepts. Everything outside GET/HEAD is +# treated as a write so the policy/write-mode gate applies. +RAW_API_METHODS = ("GET", "HEAD", "POST", "PUT", "PATCH", "DELETE") +_RAW_WRITE_METHODS = frozenset({"POST", "PUT", "PATCH", "DELETE"}) + +# Path segments/subpaths blocked for *every* method unless explicitly overridden +# via RAW_API_ALLOW_SENSITIVE. A GET on these already leaks credentials or +# privileged configuration, so they are denied independently of policy.yaml. +_RAW_SENSITIVE_SEGMENTS = frozenset({"admin", "tokens", "secrets", "hooks", "keys", "gpg_keys"}) +_RAW_SENSITIVE_SUBPATHS = ("applications/oauth2", "actions/runners/registration-token") + +# Endpoints under /repos/ that are not scoped to a single repository. +_RAW_CROSS_REPO_OWNERS = frozenset({"search", "issues"}) + +# Resources whose trailing segments form a file path target for policy checks. +_RAW_FILE_RESOURCES = frozenset({"contents", "raw", "media"}) + + +def normalize_raw_endpoint(path: str) -> str: + """Normalize a raw API path into an ``/api/v1``-prefixed endpoint. + + Accepts a bare path (``/repos/o/r``), an already-prefixed path + (``/api/v1/repos/o/r``), or a full URL (the scheme/host and any query string + are stripped — the separate ``query`` argument carries query parameters). + + Raises: + ValueError: When the path contains a ``..`` traversal segment. + """ + candidate = path.strip() + split = urlsplit(candidate) + # When a full URL is supplied, keep only its path component. + raw_path = split.path if (split.scheme or split.netloc) else candidate + # Drop any query/fragment a caller may have inlined into the path string. + raw_path = raw_path.split("?", 1)[0].split("#", 1)[0] + raw_path = raw_path.replace("\\", "/") + segments = [seg for seg in raw_path.split("/") if seg and seg != "."] + if any(seg == ".." for seg in segments): + raise ValueError("path must not contain '..' traversal segments") + rel_segments = segments[2:] if segments[:2] == ["api", "v1"] else segments + if not rel_segments: + return "/api/v1" + return "/api/v1/" + "/".join(rel_segments) + + +def _raw_relative_segments(endpoint: str) -> list[str]: + """Return the endpoint segments after the ``/api/v1`` prefix.""" + segments = [seg for seg in endpoint.split("/") if seg] + return segments[2:] if segments[:2] == ["api", "v1"] else segments + + +def raw_top_segment(endpoint: str) -> str: + """Return the first path segment after ``/api/v1`` for coarse policy grouping.""" + rel = _raw_relative_segments(endpoint) + return rel[0] if rel else "" + + +def raw_method_is_write(method: str) -> bool: + """Return whether an HTTP method mutates state.""" + return method.upper() in _RAW_WRITE_METHODS + + +def raw_is_sensitive(endpoint: str) -> bool: + """Return whether an endpoint touches an admin/credential surface.""" + rel = _raw_relative_segments(endpoint) + if any(seg in _RAW_SENSITIVE_SEGMENTS for seg in rel): + return True + joined = "/".join(rel) + return any(sub in joined for sub in _RAW_SENSITIVE_SUBPATHS) + + +def _raw_repo_segments(endpoint: str) -> list[str] | None: + """Return ``[owner, repo, *rest]`` for a single-repository endpoint, else None.""" + rel = _raw_relative_segments(endpoint) + if len(rel) < 3 or rel[0] != "repos": + return None + owner, repo = rel[1], rel[2] + if owner in _RAW_CROSS_REPO_OWNERS: + return None + if not (re.match(_REPO_PART_PATTERN, owner) and re.match(_REPO_PART_PATTERN, repo)): + return None + return [owner, repo, *rel[3:]] + + +def parse_raw_repository(endpoint: str) -> str | None: + """Parse ``owner/repo`` from a repo-scoped endpoint; None for cross-repo paths.""" + repo_segments = _raw_repo_segments(endpoint) + if repo_segments is None: + return None + return f"{repo_segments[0]}/{repo_segments[1]}" + + +def parse_raw_target_path(endpoint: str) -> str | None: + """Parse a file-path target from ``contents``/``raw``/``media`` endpoints.""" + repo_segments = _raw_repo_segments(endpoint) + if repo_segments is None or len(repo_segments) < 4: + return None + if repo_segments[2] not in _RAW_FILE_RESOURCES: + return None + file_path = "/".join(repo_segments[3:]) + return file_path or None + + +class RawApiRequestArgs(StrictBaseModel): + """Arguments for the generic ``gitea_request`` escape-hatch tool.""" + + method: Literal["GET", "HEAD", "POST", "PUT", "PATCH", "DELETE"] = Field( + ..., description="HTTP method" + ) + path: str = Field(..., min_length=1, max_length=2048, description="Gitea REST path") + query: dict[str, Any] | None = Field( + default=None, description="Optional query-string parameters" + ) + body: dict[str, Any] | None = Field(default=None, description="Optional JSON request body") + + @field_validator("method", mode="before") + @classmethod + def _normalize_method(cls, value: object) -> object: + """Uppercase the method before enum validation so 'get' is accepted.""" + if isinstance(value, str): + return value.strip().upper() + return value + + @model_validator(mode="after") + def _validate_path(self) -> RawApiRequestArgs: + """Reject path traversal up front so the handler sees a clean endpoint.""" + normalize_raw_endpoint(self.path) + return self + + def extract_repository(arguments: dict[str, object]) -> str | None: """Extract `owner/repo` from raw argument mapping. @@ -459,6 +593,16 @@ def extract_repository(arguments: dict[str, object]) -> str | None: repo = arguments.get("repo") if isinstance(owner, str) and isinstance(repo, str) and owner and repo: return f"{owner}/{repo}" + # Raw API dispatch: derive the repository from the request path so the central + # policy gate and the service-PAT per-user permission check evaluate the real + # target instead of treating every raw call as repo-less. + path = arguments.get("path") + method = arguments.get("method") + if isinstance(path, str) and isinstance(method, str): + try: + return parse_raw_repository(normalize_raw_endpoint(path)) + except ValueError: + return None return None @@ -467,4 +611,13 @@ def extract_target_path(arguments: dict[str, object]) -> str | None: filepath = arguments.get("filepath") if isinstance(filepath, str) and filepath: return filepath + # Raw API dispatch: expose the file path embedded in contents/raw/media + # endpoints so repository path allow/deny rules still apply to raw calls. + path = arguments.get("path") + method = arguments.get("method") + if isinstance(path, str) and isinstance(method, str): + try: + return parse_raw_target_path(normalize_raw_endpoint(path)) + except ValueError: + return None return None diff --git a/src/aegis_gitea_mcp/tools/raw_tools.py b/src/aegis_gitea_mcp/tools/raw_tools.py new file mode 100644 index 0000000..e73ca64 --- /dev/null +++ b/src/aegis_gitea_mcp/tools/raw_tools.py @@ -0,0 +1,129 @@ +"""Generic raw Gitea REST dispatch tool (escape hatch). + +``gitea_request`` exposes the long tail of the Gitea API that the curated, typed +tools do not cover. A single tool surface would normally collapse the +granularity of ``policy.yaml``, so this handler re-derives a coarse virtual tool +name (``gitea_request::``) and the target repository/path +from each request and runs them back through the policy engine. That reuses the +existing write-mode + write-whitelist enforcement and keeps per-method/per-repo +policy control intact behind the single tool. + +Two layers of authorization apply: + +* The central dispatch gate in ``server.py`` allows/denies the registered + ``gitea_request`` name and, in service-PAT mode, verifies the signed-in user's + permission on the parsed repository. +* This handler then authorizes the fine-grained virtual tool name and enforces a + built-in admin/credential denylist that ``policy.yaml`` cannot re-open. +""" + +from __future__ import annotations + +import json +from typing import Any + +from fastapi import HTTPException + +from aegis_gitea_mcp.audit import get_audit_logger +from aegis_gitea_mcp.config import get_settings +from aegis_gitea_mcp.gitea_client import ( + GiteaAuthenticationError, + GiteaAuthorizationError, + GiteaClient, + GiteaError, +) +from aegis_gitea_mcp.policy import get_policy_engine +from aegis_gitea_mcp.response_limits import limit_items, limit_text +from aegis_gitea_mcp.tools.arguments import ( + RawApiRequestArgs, + normalize_raw_endpoint, + parse_raw_repository, + parse_raw_target_path, + raw_is_sensitive, + raw_method_is_write, + raw_top_segment, +) + + +def _bound_response(data: Any) -> dict[str, Any]: + """Bound a raw response into stable, size-limited envelope fields.""" + if isinstance(data, list): + bounded, omitted = limit_items(list(data)) + return {"data": bounded, "count": len(bounded), "omitted": omitted} + if isinstance(data, dict): + serialized = json.dumps(data, ensure_ascii=False, default=str) + capped = limit_text(serialized) + if len(capped) < len(serialized): + # Oversized dict: return a truncated JSON string instead of the object. + return {"data": capped, "truncated": True} + return {"data": data, "truncated": False} + if isinstance(data, str): + return {"data": limit_text(data)} + return {"data": data} + + +async def raw_api_request_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]: + """Dispatch an arbitrary Gitea REST endpoint subject to policy and denylists.""" + settings = get_settings() + audit = get_audit_logger() + + if not settings.raw_api_enabled: + raise HTTPException( + status_code=403, + detail="Raw API dispatch is disabled (set RAW_API_ENABLED=true to enable).", + ) + + parsed = RawApiRequestArgs.model_validate(arguments) + method = parsed.method + endpoint = normalize_raw_endpoint(parsed.path) + is_write = raw_method_is_write(method) + + # Admin/credential denylist applies to every method and cannot be re-opened + # from policy.yaml — only RAW_API_ALLOW_SENSITIVE overrides it. + if raw_is_sensitive(endpoint) and not settings.raw_api_allow_sensitive: + audit.log_access_denied(tool_name="gitea_request", reason="raw_sensitive_path_denied") + raise HTTPException( + status_code=403, + detail=( + "Endpoint targets an admin/credential surface blocked by the raw-API " + "sensitive-path denylist." + ), + ) + + repository = parse_raw_repository(endpoint) + target_path = parse_raw_target_path(endpoint) + + # Coarse, stable virtual tool name so policy.yaml can allow/deny by method + + # top-level path segment (policy matches tool names by exact set membership). + policy_tool_name = f"gitea_request:{method}:{raw_top_segment(endpoint)}" + decision = get_policy_engine().authorize( + tool_name=policy_tool_name, + is_write=is_write, + repository=repository, + target_path=target_path, + ) + if not decision.allowed: + audit.log_access_denied( + tool_name=policy_tool_name, + repository=repository, + reason=decision.reason, + ) + raise HTTPException(status_code=403, detail=f"Policy denied raw request: {decision.reason}") + + try: + data = await gitea.raw_request(method, endpoint, params=parsed.query, json_body=parsed.body) + except (GiteaAuthenticationError, GiteaAuthorizationError): + # Let auth/authz failures surface so the server returns actionable + # re-authorization guidance instead of a generic internal error. + raise + except GiteaError as exc: + raise RuntimeError(f"Raw API request failed: {exc}") from exc + + envelope: dict[str, Any] = { + "method": method, + "path": endpoint, + "write": is_write, + "repository": repository, + } + envelope.update(_bound_response(data)) + return envelope