feat: safe full-API coverage via classified gitea_request dispatch
Add a deterministic (method, path) read/write classifier with an explicit render-only override table that can only downgrade provably side-effect-free POSTs (markdown/markup) to reads, never the reverse — so a mutating call cannot slip past the write-mode gate. Add a known-Gitea-prefix gate: gitea_request now fails closed on any path whose top segment is not a recognized /api/v1 route instead of passing unknown paths through. Expose raw_relative_segments for the authorization layer. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -468,6 +468,67 @@ _RAW_CROSS_REPO_OWNERS = frozenset({"search", "issues"})
|
||||
# Resources whose trailing segments form a file path target for policy checks.
|
||||
_RAW_FILE_RESOURCES = frozenset({"contents", "raw", "media"})
|
||||
|
||||
# Known top-level segments of the Gitea ``/api/v1`` surface. A raw request whose
|
||||
# first path segment is not in this set is rejected (fail closed): we never pass
|
||||
# an unrecognized path straight through to Gitea.
|
||||
KNOWN_API_PREFIXES = frozenset(
|
||||
{
|
||||
"activitypub",
|
||||
"admin",
|
||||
"gitignore",
|
||||
"issues",
|
||||
"label",
|
||||
"licenses",
|
||||
"markdown",
|
||||
"markup",
|
||||
"miscellaneous",
|
||||
"nodeinfo",
|
||||
"notifications",
|
||||
"org",
|
||||
"orgs",
|
||||
"packages",
|
||||
"repos",
|
||||
"repositories",
|
||||
"settings",
|
||||
"signing-key.gpg",
|
||||
"teams",
|
||||
"topics",
|
||||
"user",
|
||||
"users",
|
||||
"version",
|
||||
}
|
||||
)
|
||||
|
||||
# Override table: provably side-effect-free POSTs that may be treated as reads so
|
||||
# they do not needlessly require WRITE_MODE. This table may ONLY ever DOWNGRADE a
|
||||
# write to a read for endpoints that render content and mutate nothing — never
|
||||
# the reverse. Keyed by the final path segment of the endpoint.
|
||||
_RAW_READ_ONLY_POST_LEAVES = frozenset({"markdown", "markup", "raw"})
|
||||
|
||||
|
||||
def raw_is_known_api_path(endpoint: str) -> bool:
|
||||
"""Return whether the endpoint's top segment is a known Gitea API prefix."""
|
||||
return raw_top_segment(endpoint) in KNOWN_API_PREFIXES
|
||||
|
||||
|
||||
def raw_request_is_write(method: str, endpoint: str) -> bool:
|
||||
"""Classify a raw request as read or write from its method and path.
|
||||
|
||||
``GET``/``HEAD`` are reads; every other method is a write — except for the
|
||||
small, explicit override table of render-only POSTs (e.g. markdown/markup),
|
||||
which are reads. The override can only make a request *more* permissive for
|
||||
provably side-effect-free endpoints; it never reclassifies a mutating call as
|
||||
a read, so a misclassified write cannot slip past the write-mode gate.
|
||||
"""
|
||||
upper = method.upper()
|
||||
if upper in {"GET", "HEAD"}:
|
||||
return False
|
||||
if upper == "POST":
|
||||
rel = _raw_relative_segments(endpoint)
|
||||
if rel and rel[-1] in _RAW_READ_ONLY_POST_LEAVES:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def normalize_raw_endpoint(path: str) -> str:
|
||||
"""Normalize a raw API path into an ``/api/v1``-prefixed endpoint.
|
||||
@@ -501,6 +562,11 @@ def _raw_relative_segments(endpoint: str) -> list[str]:
|
||||
return segments[2:] if segments[:2] == ["api", "v1"] else segments
|
||||
|
||||
|
||||
def raw_relative_segments(endpoint: str) -> list[str]:
|
||||
"""Return the endpoint path segments after the ``/api/v1`` prefix (public)."""
|
||||
return _raw_relative_segments(endpoint)
|
||||
|
||||
|
||||
def raw_top_segment(endpoint: str) -> str:
|
||||
"""Return the first path segment after ``/api/v1`` for coarse policy grouping."""
|
||||
rel = _raw_relative_segments(endpoint)
|
||||
|
||||
@@ -38,8 +38,9 @@ from aegis_gitea_mcp.tools.arguments import (
|
||||
normalize_raw_endpoint,
|
||||
parse_raw_repository,
|
||||
parse_raw_target_path,
|
||||
raw_is_known_api_path,
|
||||
raw_is_sensitive,
|
||||
raw_method_is_write,
|
||||
raw_request_is_write,
|
||||
raw_top_segment,
|
||||
)
|
||||
|
||||
@@ -75,7 +76,21 @@ async def raw_api_request_tool(gitea: GiteaClient, arguments: dict[str, Any]) ->
|
||||
parsed = RawApiRequestArgs.model_validate(arguments)
|
||||
method = parsed.method
|
||||
endpoint = normalize_raw_endpoint(parsed.path)
|
||||
is_write = raw_method_is_write(method)
|
||||
|
||||
# Fail closed on paths that do not match a known Gitea API prefix: an
|
||||
# unrecognized path is never passed straight through to the backend.
|
||||
if not raw_is_known_api_path(endpoint):
|
||||
audit.log_access_denied(tool_name="gitea_request", reason="raw_unknown_path_denied")
|
||||
raise ToolError(
|
||||
"Endpoint does not match a known Gitea API route prefix.",
|
||||
status_code=403,
|
||||
)
|
||||
|
||||
# Deterministic read/write classification (override-aware): a non-GET/HEAD
|
||||
# method is a write unless it is in the explicit render-only override table,
|
||||
# so a mutating call can never be misclassified as a read and slip past the
|
||||
# write-mode gate.
|
||||
is_write = raw_request_is_write(method, endpoint)
|
||||
|
||||
# Admin/credential denylist applies to every method and cannot be re-opened
|
||||
# from policy.yaml — only RAW_API_ALLOW_SENSITIVE overrides it.
|
||||
|
||||
@@ -0,0 +1,128 @@
|
||||
"""Tests for the gitea_request read/write classifier and known-path gate."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from aegis_gitea_mcp.config import reset_settings
|
||||
from aegis_gitea_mcp.errors import ToolError
|
||||
from aegis_gitea_mcp.tools.arguments import (
|
||||
normalize_raw_endpoint,
|
||||
raw_is_known_api_path,
|
||||
raw_request_is_write,
|
||||
)
|
||||
from aegis_gitea_mcp.tools.raw_tools import raw_api_request_tool
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def raw_env(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
|
||||
"""API-key-mode settings with default policy (read allow, write deny)."""
|
||||
reset_settings()
|
||||
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
|
||||
monkeypatch.setenv("GITEA_TOKEN", "test-token")
|
||||
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
|
||||
monkeypatch.setenv("ENVIRONMENT", "test")
|
||||
monkeypatch.setenv("POLICY_FILE_PATH", str(tmp_path / "missing-policy.yaml"))
|
||||
|
||||
|
||||
class StubRawGitea:
|
||||
"""Stub Gitea client capturing raw_request calls."""
|
||||
|
||||
def __init__(self, response: Any = None) -> None:
|
||||
self._response: Any = {"ok": True} if response is None else response
|
||||
self.calls: list[dict[str, Any]] = []
|
||||
|
||||
async def raw_request(
|
||||
self,
|
||||
method: str,
|
||||
endpoint: str,
|
||||
*,
|
||||
params: dict[str, Any] | None = None,
|
||||
json_body: dict[str, Any] | None = None,
|
||||
) -> Any:
|
||||
self.calls.append({"method": method, "endpoint": endpoint})
|
||||
return self._response
|
||||
|
||||
|
||||
# --- Pure classifier --------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("method", "path", "expected_write"),
|
||||
[
|
||||
("GET", "/repos/o/r/issues", False),
|
||||
("HEAD", "/repos/o/r", False),
|
||||
("POST", "/repos/o/r/issues", True),
|
||||
("PUT", "/repos/o/r/pulls/1/merge", True),
|
||||
("PATCH", "/repos/o/r/issues/1", True),
|
||||
("DELETE", "/repos/o/r/issues/1", True),
|
||||
# Render-only overrides are reads even though they are POSTs.
|
||||
("POST", "/markdown", False),
|
||||
("POST", "/markdown/raw", False),
|
||||
("POST", "/repos/o/r/markup", False),
|
||||
],
|
||||
)
|
||||
def test_raw_request_is_write(method: str, path: str, expected_write: bool) -> None:
|
||||
endpoint = normalize_raw_endpoint(path)
|
||||
assert raw_request_is_write(method, endpoint) is expected_write
|
||||
|
||||
|
||||
def test_override_never_upgrades_a_mutating_post() -> None:
|
||||
"""A normal mutating POST is never reclassified as a read."""
|
||||
endpoint = normalize_raw_endpoint("/repos/o/r/issues")
|
||||
assert raw_request_is_write("POST", endpoint) is True
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("path", "known"),
|
||||
[
|
||||
("/repos/o/r", True),
|
||||
("/orgs/acme/repos", True),
|
||||
("/admin/users", True),
|
||||
("/user/repos", True),
|
||||
("/markdown", True),
|
||||
("/version", True),
|
||||
("/definitely/not/a/real/prefix", False),
|
||||
("/wibble", False),
|
||||
],
|
||||
)
|
||||
def test_raw_is_known_api_path(path: str, known: bool) -> None:
|
||||
assert raw_is_known_api_path(normalize_raw_endpoint(path)) is known
|
||||
|
||||
|
||||
# --- Handler: unknown path is denied before any network call ----------------
|
||||
|
||||
|
||||
async def test_unknown_prefix_denied_before_network(raw_env: None) -> None:
|
||||
stub = StubRawGitea()
|
||||
with pytest.raises(ToolError) as exc_info:
|
||||
await raw_api_request_tool(stub, {"method": "GET", "path": "/wibble/wobble"})
|
||||
assert exc_info.value.status_code == 403
|
||||
assert "known Gitea API route prefix" in str(exc_info.value.detail)
|
||||
assert stub.calls == []
|
||||
|
||||
|
||||
# --- Write-mode bypass: a write that "looks like a read" is still a write ----
|
||||
|
||||
|
||||
async def test_write_method_denied_with_write_mode_off_even_on_readish_path(
|
||||
raw_env: None,
|
||||
) -> None:
|
||||
"""A POST to a known repo path is a write and is denied while write-mode is off."""
|
||||
stub = StubRawGitea()
|
||||
with pytest.raises(ToolError) as exc_info:
|
||||
await raw_api_request_tool(stub, {"method": "POST", "path": "/repos/acme/app/issues"})
|
||||
assert exc_info.value.status_code == 403
|
||||
assert "write mode is disabled" in str(exc_info.value.detail)
|
||||
assert stub.calls == []
|
||||
|
||||
|
||||
async def test_render_only_post_allowed_as_read_without_write_mode(raw_env: None) -> None:
|
||||
"""A markdown-render POST is classified read and proceeds with write-mode off."""
|
||||
stub = StubRawGitea({"rendered": "<p>hi</p>"})
|
||||
result = await raw_api_request_tool(stub, {"method": "POST", "path": "/markdown"})
|
||||
assert result["write"] is False
|
||||
assert stub.calls and stub.calls[0]["endpoint"] == "/api/v1/markdown"
|
||||
Reference in New Issue
Block a user