test(raw-api): cover gitea_request handler and path parsing
docker / lint (push) Successful in 38s
docker / test (push) Successful in 33s
docker / test (pull_request) Successful in 32s
test / test (push) Successful in 40s
lint / lint (push) Successful in 42s
docker / lint (pull_request) Successful in 39s
test / test (pull_request) Successful in 39s
lint / lint (pull_request) Successful in 40s
docker / docker (pull_request) Successful in 31s
docker / docker (push) Successful in 44s
docker / lint (push) Successful in 38s
docker / test (push) Successful in 33s
docker / test (pull_request) Successful in 32s
test / test (push) Successful in 40s
lint / lint (push) Successful in 42s
docker / lint (pull_request) Successful in 39s
test / test (pull_request) Successful in 39s
lint / lint (pull_request) Successful in 40s
docker / docker (pull_request) Successful in 31s
docker / docker (push) Successful in 44s
Covers read allow + repository parsing, write denied without write-mode, write allowed only for whitelisted repos, non-repo write denial, sensitive-path denial (incl. GET) and override, cross-repo search handling, unknown-method and traversal rejection before any network call, killswitch, response truncation, and the raw path-parsing helpers and raw-aware extractors. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,321 @@
|
||||
"""Tests for the generic gitea_request raw API dispatch tool."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
from fastapi import HTTPException
|
||||
from pydantic import ValidationError
|
||||
|
||||
from aegis_gitea_mcp.config import reset_settings
|
||||
from aegis_gitea_mcp.tools.arguments import (
|
||||
extract_repository,
|
||||
extract_target_path,
|
||||
normalize_raw_endpoint,
|
||||
parse_raw_repository,
|
||||
parse_raw_target_path,
|
||||
raw_is_sensitive,
|
||||
raw_top_segment,
|
||||
)
|
||||
from aegis_gitea_mcp.tools.raw_tools import raw_api_request_tool
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def raw_env(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
|
||||
"""Minimal API-key-mode settings with policy that allows reads, denies writes."""
|
||||
reset_settings()
|
||||
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
|
||||
monkeypatch.setenv("GITEA_TOKEN", "test-token")
|
||||
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
|
||||
monkeypatch.setenv("ENVIRONMENT", "test")
|
||||
# Point at a non-existent policy file so the default config applies
|
||||
# (read: allow, write: deny) and tests do not depend on the repo policy.yaml.
|
||||
monkeypatch.setenv("POLICY_FILE_PATH", str(tmp_path / "missing-policy.yaml"))
|
||||
|
||||
|
||||
class StubRawGitea:
|
||||
"""Stub Gitea client capturing raw_request calls."""
|
||||
|
||||
def __init__(self, response: Any = None) -> None:
|
||||
self._response: Any = {"ok": True} if response is None else response
|
||||
self.calls: list[dict[str, Any]] = []
|
||||
|
||||
async def raw_request(
|
||||
self,
|
||||
method: str,
|
||||
endpoint: str,
|
||||
*,
|
||||
params: dict[str, Any] | None = None,
|
||||
json_body: dict[str, Any] | None = None,
|
||||
) -> Any:
|
||||
self.calls.append(
|
||||
{"method": method, "endpoint": endpoint, "params": params, "json_body": json_body}
|
||||
)
|
||||
return self._response
|
||||
|
||||
|
||||
# --- Handler behavior ------------------------------------------------------
|
||||
|
||||
|
||||
async def test_get_repo_endpoint_allowed_and_parses_repository(raw_env: None) -> None:
|
||||
"""A GET on a repo endpoint is allowed and parses owner/repo from the path."""
|
||||
stub = StubRawGitea({"number": 1})
|
||||
result = await raw_api_request_tool(stub, {"method": "GET", "path": "/repos/acme/app/pulls/1"})
|
||||
|
||||
assert result["method"] == "GET"
|
||||
assert result["path"] == "/api/v1/repos/acme/app/pulls/1"
|
||||
assert result["write"] is False
|
||||
assert result["repository"] == "acme/app"
|
||||
assert result["data"] == {"number": 1}
|
||||
assert stub.calls[0]["endpoint"] == "/api/v1/repos/acme/app/pulls/1"
|
||||
|
||||
|
||||
async def test_lowercase_method_is_normalized(raw_env: None) -> None:
|
||||
"""A lowercase method is uppercased and accepted."""
|
||||
stub = StubRawGitea([{"id": 1}])
|
||||
result = await raw_api_request_tool(stub, {"method": "get", "path": "/repos/acme/app/issues"})
|
||||
assert result["method"] == "GET"
|
||||
assert result["count"] == 1
|
||||
|
||||
|
||||
async def test_delete_denied_when_write_mode_off(raw_env: None) -> None:
|
||||
"""A write method is denied (no network call) while write-mode is disabled."""
|
||||
stub = StubRawGitea()
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await raw_api_request_tool(stub, {"method": "DELETE", "path": "/repos/acme/app/issues/1"})
|
||||
|
||||
assert exc_info.value.status_code == 403
|
||||
assert "write mode is disabled" in str(exc_info.value.detail)
|
||||
assert stub.calls == []
|
||||
|
||||
|
||||
async def test_write_allowed_with_write_mode_and_whitelist(
|
||||
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
|
||||
) -> None:
|
||||
"""A write succeeds only when write-mode is on, the repo is whitelisted, and policy allows."""
|
||||
policy_file = tmp_path / "policy.yaml"
|
||||
policy_file.write_text("defaults:\n read: allow\n write: allow\n", encoding="utf-8")
|
||||
|
||||
reset_settings()
|
||||
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
|
||||
monkeypatch.setenv("GITEA_TOKEN", "test-token")
|
||||
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
|
||||
monkeypatch.setenv("ENVIRONMENT", "test")
|
||||
monkeypatch.setenv("POLICY_FILE_PATH", str(policy_file))
|
||||
monkeypatch.setenv("WRITE_MODE", "true")
|
||||
monkeypatch.setenv("WRITE_REPOSITORY_WHITELIST", "acme/app")
|
||||
|
||||
stub = StubRawGitea({"merged": True})
|
||||
result = await raw_api_request_tool(
|
||||
stub,
|
||||
{"method": "PUT", "path": "/repos/acme/app/pulls/1/merge", "body": {"Do": "merge"}},
|
||||
)
|
||||
|
||||
assert result["write"] is True
|
||||
assert result["repository"] == "acme/app"
|
||||
assert stub.calls[0]["json_body"] == {"Do": "merge"}
|
||||
|
||||
|
||||
async def test_write_denied_for_repo_outside_whitelist(
|
||||
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
|
||||
) -> None:
|
||||
"""A write on a repo not in the whitelist is denied even with write-mode on."""
|
||||
policy_file = tmp_path / "policy.yaml"
|
||||
policy_file.write_text("defaults:\n read: allow\n write: allow\n", encoding="utf-8")
|
||||
|
||||
reset_settings()
|
||||
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
|
||||
monkeypatch.setenv("GITEA_TOKEN", "test-token")
|
||||
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
|
||||
monkeypatch.setenv("ENVIRONMENT", "test")
|
||||
monkeypatch.setenv("POLICY_FILE_PATH", str(policy_file))
|
||||
monkeypatch.setenv("WRITE_MODE", "true")
|
||||
monkeypatch.setenv("WRITE_REPOSITORY_WHITELIST", "acme/other")
|
||||
|
||||
stub = StubRawGitea()
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await raw_api_request_tool(stub, {"method": "POST", "path": "/repos/acme/app/issues"})
|
||||
|
||||
assert exc_info.value.status_code == 403
|
||||
assert "whitelist" in str(exc_info.value.detail)
|
||||
assert stub.calls == []
|
||||
|
||||
|
||||
async def test_non_repository_write_denied(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
|
||||
"""A write that targets no repository is denied (secure default)."""
|
||||
policy_file = tmp_path / "policy.yaml"
|
||||
policy_file.write_text("defaults:\n read: allow\n write: allow\n", encoding="utf-8")
|
||||
|
||||
reset_settings()
|
||||
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
|
||||
monkeypatch.setenv("GITEA_TOKEN", "test-token")
|
||||
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
|
||||
monkeypatch.setenv("ENVIRONMENT", "test")
|
||||
monkeypatch.setenv("POLICY_FILE_PATH", str(policy_file))
|
||||
monkeypatch.setenv("WRITE_MODE", "true")
|
||||
monkeypatch.setenv("WRITE_REPOSITORY_WHITELIST", "acme/app")
|
||||
|
||||
stub = StubRawGitea()
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await raw_api_request_tool(stub, {"method": "POST", "path": "/user/repos"})
|
||||
|
||||
assert exc_info.value.status_code == 403
|
||||
assert "repository target" in str(exc_info.value.detail)
|
||||
assert stub.calls == []
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"path",
|
||||
["/admin/users", "/users/bob/tokens", "/repos/acme/app/hooks", "/user/keys"],
|
||||
)
|
||||
async def test_sensitive_paths_denied_on_get(raw_env: None, path: str) -> None:
|
||||
"""Admin/credential surfaces are denied for every method, including GET."""
|
||||
stub = StubRawGitea()
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await raw_api_request_tool(stub, {"method": "GET", "path": path})
|
||||
|
||||
assert exc_info.value.status_code == 403
|
||||
assert "sensitive-path denylist" in str(exc_info.value.detail)
|
||||
assert stub.calls == []
|
||||
|
||||
|
||||
async def test_sensitive_path_allowed_with_override(
|
||||
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
|
||||
) -> None:
|
||||
"""RAW_API_ALLOW_SENSITIVE bypasses the admin/credential denylist."""
|
||||
reset_settings()
|
||||
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
|
||||
monkeypatch.setenv("GITEA_TOKEN", "test-token")
|
||||
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
|
||||
monkeypatch.setenv("ENVIRONMENT", "test")
|
||||
monkeypatch.setenv("POLICY_FILE_PATH", str(tmp_path / "missing.yaml"))
|
||||
monkeypatch.setenv("RAW_API_ALLOW_SENSITIVE", "true")
|
||||
|
||||
stub = StubRawGitea([{"id": 1}])
|
||||
result = await raw_api_request_tool(stub, {"method": "GET", "path": "/admin/users"})
|
||||
assert result["data"] == [{"id": 1}]
|
||||
assert stub.calls[0]["endpoint"] == "/api/v1/admin/users"
|
||||
|
||||
|
||||
async def test_cross_repo_search_not_treated_as_repository(raw_env: None) -> None:
|
||||
"""/repos/issues/search is a cross-repo endpoint, so repository is None."""
|
||||
stub = StubRawGitea([{"id": 1}])
|
||||
result = await raw_api_request_tool(
|
||||
stub, {"method": "GET", "path": "/repos/issues/search", "query": {"q": "bug"}}
|
||||
)
|
||||
assert result["repository"] is None
|
||||
assert result["count"] == 1
|
||||
assert stub.calls[0]["params"] == {"q": "bug"}
|
||||
|
||||
|
||||
async def test_unknown_method_rejected_before_network(raw_env: None) -> None:
|
||||
"""An unknown HTTP method is rejected during validation before any network call."""
|
||||
stub = StubRawGitea()
|
||||
with pytest.raises(ValidationError):
|
||||
await raw_api_request_tool(stub, {"method": "OPTIONS", "path": "/repos/acme/app"})
|
||||
assert stub.calls == []
|
||||
|
||||
|
||||
async def test_path_traversal_rejected(raw_env: None) -> None:
|
||||
"""A path containing '..' is rejected during validation."""
|
||||
stub = StubRawGitea()
|
||||
with pytest.raises(ValidationError):
|
||||
await raw_api_request_tool(
|
||||
stub, {"method": "GET", "path": "/repos/acme/app/../../admin/users"}
|
||||
)
|
||||
assert stub.calls == []
|
||||
|
||||
|
||||
async def test_full_url_is_reduced_to_path(raw_env: None) -> None:
|
||||
"""A full URL is reduced to just the API path."""
|
||||
stub = StubRawGitea({"name": "app"})
|
||||
result = await raw_api_request_tool(
|
||||
stub,
|
||||
{
|
||||
"method": "GET",
|
||||
"path": "https://gitea.example.com/api/v1/repos/acme/app/contents/src/app.py?ref=main",
|
||||
},
|
||||
)
|
||||
assert result["path"] == "/api/v1/repos/acme/app/contents/src/app.py"
|
||||
assert result["repository"] == "acme/app"
|
||||
|
||||
|
||||
async def test_raw_api_disabled(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
|
||||
"""The killswitch disables every dispatch."""
|
||||
reset_settings()
|
||||
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
|
||||
monkeypatch.setenv("GITEA_TOKEN", "test-token")
|
||||
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
|
||||
monkeypatch.setenv("ENVIRONMENT", "test")
|
||||
monkeypatch.setenv("POLICY_FILE_PATH", str(tmp_path / "missing.yaml"))
|
||||
monkeypatch.setenv("RAW_API_ENABLED", "false")
|
||||
|
||||
stub = StubRawGitea()
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await raw_api_request_tool(stub, {"method": "GET", "path": "/repos/acme/app"})
|
||||
assert exc_info.value.status_code == 403
|
||||
assert "disabled" in str(exc_info.value.detail)
|
||||
assert stub.calls == []
|
||||
|
||||
|
||||
async def test_large_dict_response_is_truncated(raw_env: None) -> None:
|
||||
"""An oversized object response is returned as a truncated JSON string."""
|
||||
big = {"blob": "x" * 50_000}
|
||||
stub = StubRawGitea(big)
|
||||
result = await raw_api_request_tool(stub, {"method": "GET", "path": "/repos/acme/app"})
|
||||
assert result["truncated"] is True
|
||||
assert isinstance(result["data"], str)
|
||||
|
||||
|
||||
# --- Path parsing helpers --------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("path", "expected"),
|
||||
[
|
||||
("/repos/acme/app", "/api/v1/repos/acme/app"),
|
||||
("repos/acme/app", "/api/v1/repos/acme/app"),
|
||||
("/api/v1/repos/acme/app", "/api/v1/repos/acme/app"),
|
||||
("/", "/api/v1"),
|
||||
("", "/api/v1"),
|
||||
],
|
||||
)
|
||||
def test_normalize_raw_endpoint(path: str, expected: str) -> None:
|
||||
assert normalize_raw_endpoint(path) == expected
|
||||
|
||||
|
||||
def test_normalize_raw_endpoint_rejects_traversal() -> None:
|
||||
with pytest.raises(ValueError):
|
||||
normalize_raw_endpoint("/repos/acme/../admin")
|
||||
|
||||
|
||||
def test_parse_raw_repository_variants() -> None:
|
||||
assert parse_raw_repository("/api/v1/repos/acme/app/pulls/1") == "acme/app"
|
||||
assert parse_raw_repository("/api/v1/repos/search") is None
|
||||
assert parse_raw_repository("/api/v1/repos/issues/search") is None
|
||||
assert parse_raw_repository("/api/v1/user/repos") is None
|
||||
|
||||
|
||||
def test_parse_raw_target_path() -> None:
|
||||
assert parse_raw_target_path("/api/v1/repos/acme/app/contents/src/app.py") == "src/app.py"
|
||||
assert parse_raw_target_path("/api/v1/repos/acme/app/raw/README.md") == "README.md"
|
||||
assert parse_raw_target_path("/api/v1/repos/acme/app/pulls/1") is None
|
||||
|
||||
|
||||
def test_raw_top_segment_and_sensitivity() -> None:
|
||||
assert raw_top_segment("/api/v1/repos/acme/app") == "repos"
|
||||
assert raw_top_segment("/api/v1") == ""
|
||||
assert raw_is_sensitive("/api/v1/repos/acme/app/hooks") is True
|
||||
assert raw_is_sensitive("/api/v1/user/applications/oauth2") is True
|
||||
assert raw_is_sensitive("/api/v1/repos/acme/app/pulls") is False
|
||||
|
||||
|
||||
def test_extractors_are_raw_aware() -> None:
|
||||
raw_args = {"method": "GET", "path": "/repos/acme/app/contents/src/app.py"}
|
||||
assert extract_repository(raw_args) == "acme/app"
|
||||
assert extract_target_path(raw_args) == "src/app.py"
|
||||
# Malformed raw path must not raise from the extractors.
|
||||
assert extract_repository({"method": "GET", "path": "/repos/acme/../x"}) is None
|
||||
assert extract_target_path({"method": "GET", "path": "/repos/acme/../x"}) is None
|
||||
Reference in New Issue
Block a user