diff --git a/tests/test_raw_api.py b/tests/test_raw_api.py new file mode 100644 index 0000000..400175a --- /dev/null +++ b/tests/test_raw_api.py @@ -0,0 +1,321 @@ +"""Tests for the generic gitea_request raw API dispatch tool.""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import pytest +from fastapi import HTTPException +from pydantic import ValidationError + +from aegis_gitea_mcp.config import reset_settings +from aegis_gitea_mcp.tools.arguments import ( + extract_repository, + extract_target_path, + normalize_raw_endpoint, + parse_raw_repository, + parse_raw_target_path, + raw_is_sensitive, + raw_top_segment, +) +from aegis_gitea_mcp.tools.raw_tools import raw_api_request_tool + + +@pytest.fixture +def raw_env(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + """Minimal API-key-mode settings with policy that allows reads, denies writes.""" + reset_settings() + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("GITEA_TOKEN", "test-token") + monkeypatch.setenv("MCP_API_KEYS", "a" * 64) + monkeypatch.setenv("ENVIRONMENT", "test") + # Point at a non-existent policy file so the default config applies + # (read: allow, write: deny) and tests do not depend on the repo policy.yaml. + monkeypatch.setenv("POLICY_FILE_PATH", str(tmp_path / "missing-policy.yaml")) + + +class StubRawGitea: + """Stub Gitea client capturing raw_request calls.""" + + def __init__(self, response: Any = None) -> None: + self._response: Any = {"ok": True} if response is None else response + self.calls: list[dict[str, Any]] = [] + + async def raw_request( + self, + method: str, + endpoint: str, + *, + params: dict[str, Any] | None = None, + json_body: dict[str, Any] | None = None, + ) -> Any: + self.calls.append( + {"method": method, "endpoint": endpoint, "params": params, "json_body": json_body} + ) + return self._response + + +# --- Handler behavior ------------------------------------------------------ + + +async def test_get_repo_endpoint_allowed_and_parses_repository(raw_env: None) -> None: + """A GET on a repo endpoint is allowed and parses owner/repo from the path.""" + stub = StubRawGitea({"number": 1}) + result = await raw_api_request_tool(stub, {"method": "GET", "path": "/repos/acme/app/pulls/1"}) + + assert result["method"] == "GET" + assert result["path"] == "/api/v1/repos/acme/app/pulls/1" + assert result["write"] is False + assert result["repository"] == "acme/app" + assert result["data"] == {"number": 1} + assert stub.calls[0]["endpoint"] == "/api/v1/repos/acme/app/pulls/1" + + +async def test_lowercase_method_is_normalized(raw_env: None) -> None: + """A lowercase method is uppercased and accepted.""" + stub = StubRawGitea([{"id": 1}]) + result = await raw_api_request_tool(stub, {"method": "get", "path": "/repos/acme/app/issues"}) + assert result["method"] == "GET" + assert result["count"] == 1 + + +async def test_delete_denied_when_write_mode_off(raw_env: None) -> None: + """A write method is denied (no network call) while write-mode is disabled.""" + stub = StubRawGitea() + with pytest.raises(HTTPException) as exc_info: + await raw_api_request_tool(stub, {"method": "DELETE", "path": "/repos/acme/app/issues/1"}) + + assert exc_info.value.status_code == 403 + assert "write mode is disabled" in str(exc_info.value.detail) + assert stub.calls == [] + + +async def test_write_allowed_with_write_mode_and_whitelist( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + """A write succeeds only when write-mode is on, the repo is whitelisted, and policy allows.""" + policy_file = tmp_path / "policy.yaml" + policy_file.write_text("defaults:\n read: allow\n write: allow\n", encoding="utf-8") + + reset_settings() + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("GITEA_TOKEN", "test-token") + monkeypatch.setenv("MCP_API_KEYS", "a" * 64) + monkeypatch.setenv("ENVIRONMENT", "test") + monkeypatch.setenv("POLICY_FILE_PATH", str(policy_file)) + monkeypatch.setenv("WRITE_MODE", "true") + monkeypatch.setenv("WRITE_REPOSITORY_WHITELIST", "acme/app") + + stub = StubRawGitea({"merged": True}) + result = await raw_api_request_tool( + stub, + {"method": "PUT", "path": "/repos/acme/app/pulls/1/merge", "body": {"Do": "merge"}}, + ) + + assert result["write"] is True + assert result["repository"] == "acme/app" + assert stub.calls[0]["json_body"] == {"Do": "merge"} + + +async def test_write_denied_for_repo_outside_whitelist( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + """A write on a repo not in the whitelist is denied even with write-mode on.""" + policy_file = tmp_path / "policy.yaml" + policy_file.write_text("defaults:\n read: allow\n write: allow\n", encoding="utf-8") + + reset_settings() + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("GITEA_TOKEN", "test-token") + monkeypatch.setenv("MCP_API_KEYS", "a" * 64) + monkeypatch.setenv("ENVIRONMENT", "test") + monkeypatch.setenv("POLICY_FILE_PATH", str(policy_file)) + monkeypatch.setenv("WRITE_MODE", "true") + monkeypatch.setenv("WRITE_REPOSITORY_WHITELIST", "acme/other") + + stub = StubRawGitea() + with pytest.raises(HTTPException) as exc_info: + await raw_api_request_tool(stub, {"method": "POST", "path": "/repos/acme/app/issues"}) + + assert exc_info.value.status_code == 403 + assert "whitelist" in str(exc_info.value.detail) + assert stub.calls == [] + + +async def test_non_repository_write_denied(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + """A write that targets no repository is denied (secure default).""" + policy_file = tmp_path / "policy.yaml" + policy_file.write_text("defaults:\n read: allow\n write: allow\n", encoding="utf-8") + + reset_settings() + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("GITEA_TOKEN", "test-token") + monkeypatch.setenv("MCP_API_KEYS", "a" * 64) + monkeypatch.setenv("ENVIRONMENT", "test") + monkeypatch.setenv("POLICY_FILE_PATH", str(policy_file)) + monkeypatch.setenv("WRITE_MODE", "true") + monkeypatch.setenv("WRITE_REPOSITORY_WHITELIST", "acme/app") + + stub = StubRawGitea() + with pytest.raises(HTTPException) as exc_info: + await raw_api_request_tool(stub, {"method": "POST", "path": "/user/repos"}) + + assert exc_info.value.status_code == 403 + assert "repository target" in str(exc_info.value.detail) + assert stub.calls == [] + + +@pytest.mark.parametrize( + "path", + ["/admin/users", "/users/bob/tokens", "/repos/acme/app/hooks", "/user/keys"], +) +async def test_sensitive_paths_denied_on_get(raw_env: None, path: str) -> None: + """Admin/credential surfaces are denied for every method, including GET.""" + stub = StubRawGitea() + with pytest.raises(HTTPException) as exc_info: + await raw_api_request_tool(stub, {"method": "GET", "path": path}) + + assert exc_info.value.status_code == 403 + assert "sensitive-path denylist" in str(exc_info.value.detail) + assert stub.calls == [] + + +async def test_sensitive_path_allowed_with_override( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + """RAW_API_ALLOW_SENSITIVE bypasses the admin/credential denylist.""" + reset_settings() + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("GITEA_TOKEN", "test-token") + monkeypatch.setenv("MCP_API_KEYS", "a" * 64) + monkeypatch.setenv("ENVIRONMENT", "test") + monkeypatch.setenv("POLICY_FILE_PATH", str(tmp_path / "missing.yaml")) + monkeypatch.setenv("RAW_API_ALLOW_SENSITIVE", "true") + + stub = StubRawGitea([{"id": 1}]) + result = await raw_api_request_tool(stub, {"method": "GET", "path": "/admin/users"}) + assert result["data"] == [{"id": 1}] + assert stub.calls[0]["endpoint"] == "/api/v1/admin/users" + + +async def test_cross_repo_search_not_treated_as_repository(raw_env: None) -> None: + """/repos/issues/search is a cross-repo endpoint, so repository is None.""" + stub = StubRawGitea([{"id": 1}]) + result = await raw_api_request_tool( + stub, {"method": "GET", "path": "/repos/issues/search", "query": {"q": "bug"}} + ) + assert result["repository"] is None + assert result["count"] == 1 + assert stub.calls[0]["params"] == {"q": "bug"} + + +async def test_unknown_method_rejected_before_network(raw_env: None) -> None: + """An unknown HTTP method is rejected during validation before any network call.""" + stub = StubRawGitea() + with pytest.raises(ValidationError): + await raw_api_request_tool(stub, {"method": "OPTIONS", "path": "/repos/acme/app"}) + assert stub.calls == [] + + +async def test_path_traversal_rejected(raw_env: None) -> None: + """A path containing '..' is rejected during validation.""" + stub = StubRawGitea() + with pytest.raises(ValidationError): + await raw_api_request_tool( + stub, {"method": "GET", "path": "/repos/acme/app/../../admin/users"} + ) + assert stub.calls == [] + + +async def test_full_url_is_reduced_to_path(raw_env: None) -> None: + """A full URL is reduced to just the API path.""" + stub = StubRawGitea({"name": "app"}) + result = await raw_api_request_tool( + stub, + { + "method": "GET", + "path": "https://gitea.example.com/api/v1/repos/acme/app/contents/src/app.py?ref=main", + }, + ) + assert result["path"] == "/api/v1/repos/acme/app/contents/src/app.py" + assert result["repository"] == "acme/app" + + +async def test_raw_api_disabled(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + """The killswitch disables every dispatch.""" + reset_settings() + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("GITEA_TOKEN", "test-token") + monkeypatch.setenv("MCP_API_KEYS", "a" * 64) + monkeypatch.setenv("ENVIRONMENT", "test") + monkeypatch.setenv("POLICY_FILE_PATH", str(tmp_path / "missing.yaml")) + monkeypatch.setenv("RAW_API_ENABLED", "false") + + stub = StubRawGitea() + with pytest.raises(HTTPException) as exc_info: + await raw_api_request_tool(stub, {"method": "GET", "path": "/repos/acme/app"}) + assert exc_info.value.status_code == 403 + assert "disabled" in str(exc_info.value.detail) + assert stub.calls == [] + + +async def test_large_dict_response_is_truncated(raw_env: None) -> None: + """An oversized object response is returned as a truncated JSON string.""" + big = {"blob": "x" * 50_000} + stub = StubRawGitea(big) + result = await raw_api_request_tool(stub, {"method": "GET", "path": "/repos/acme/app"}) + assert result["truncated"] is True + assert isinstance(result["data"], str) + + +# --- Path parsing helpers -------------------------------------------------- + + +@pytest.mark.parametrize( + ("path", "expected"), + [ + ("/repos/acme/app", "/api/v1/repos/acme/app"), + ("repos/acme/app", "/api/v1/repos/acme/app"), + ("/api/v1/repos/acme/app", "/api/v1/repos/acme/app"), + ("/", "/api/v1"), + ("", "/api/v1"), + ], +) +def test_normalize_raw_endpoint(path: str, expected: str) -> None: + assert normalize_raw_endpoint(path) == expected + + +def test_normalize_raw_endpoint_rejects_traversal() -> None: + with pytest.raises(ValueError): + normalize_raw_endpoint("/repos/acme/../admin") + + +def test_parse_raw_repository_variants() -> None: + assert parse_raw_repository("/api/v1/repos/acme/app/pulls/1") == "acme/app" + assert parse_raw_repository("/api/v1/repos/search") is None + assert parse_raw_repository("/api/v1/repos/issues/search") is None + assert parse_raw_repository("/api/v1/user/repos") is None + + +def test_parse_raw_target_path() -> None: + assert parse_raw_target_path("/api/v1/repos/acme/app/contents/src/app.py") == "src/app.py" + assert parse_raw_target_path("/api/v1/repos/acme/app/raw/README.md") == "README.md" + assert parse_raw_target_path("/api/v1/repos/acme/app/pulls/1") is None + + +def test_raw_top_segment_and_sensitivity() -> None: + assert raw_top_segment("/api/v1/repos/acme/app") == "repos" + assert raw_top_segment("/api/v1") == "" + assert raw_is_sensitive("/api/v1/repos/acme/app/hooks") is True + assert raw_is_sensitive("/api/v1/user/applications/oauth2") is True + assert raw_is_sensitive("/api/v1/repos/acme/app/pulls") is False + + +def test_extractors_are_raw_aware() -> None: + raw_args = {"method": "GET", "path": "/repos/acme/app/contents/src/app.py"} + assert extract_repository(raw_args) == "acme/app" + assert extract_target_path(raw_args) == "src/app.py" + # Malformed raw path must not raise from the extractors. + assert extract_repository({"method": "GET", "path": "/repos/acme/../x"}) is None + assert extract_target_path({"method": "GET", "path": "/repos/acme/../x"}) is None