"""Tests for resource-type-aware authorization (fail-closed).""" from __future__ import annotations from pathlib import Path from typing import Any from unittest.mock import AsyncMock, MagicMock, patch import pytest from aegis_gitea_mcp import authz from aegis_gitea_mcp.authz import ( ResourceClass, ResourceType, authorize_non_repository_access, classify_raw_endpoint, classify_tool, verify_org_membership, verify_site_admin, ) from aegis_gitea_mcp.config import reset_settings from aegis_gitea_mcp.errors import ToolError from aegis_gitea_mcp.tools.arguments import normalize_raw_endpoint @pytest.fixture def authz_env(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: """Service-PAT-mode settings used by the authorization layer.""" reset_settings() monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") monkeypatch.setenv("GITEA_TOKEN", "service-pat-token") monkeypatch.setenv("MCP_API_KEYS", "a" * 64) monkeypatch.setenv("ENVIRONMENT", "test") monkeypatch.setenv("POLICY_FILE_PATH", str(tmp_path / "missing-policy.yaml")) def _endpoint(path: str) -> str: return normalize_raw_endpoint(path) # --- Classification --------------------------------------------------------- @pytest.mark.parametrize( ("method", "path", "rtype", "ident_field", "ident_value"), [ ("GET", "/repos/acme/app/pulls/1", ResourceType.REPOSITORY, "repository", "acme/app"), ("GET", "/repos/issues/search", ResourceType.REPOSITORY, "repository", None), ("GET", "/orgs/acme/repos", ResourceType.ORG, "org", "acme"), ("GET", "/users/bob/repos", ResourceType.USER_OWNED, "owner", "bob"), ("GET", "/packages/bob/pypi", ResourceType.USER_OWNED, "owner", "bob"), ("GET", "/user/repos", ResourceType.USER_SELF, "repository", None), ("GET", "/notifications", ResourceType.USER_SELF, "repository", None), ("GET", "/markdown", ResourceType.MISC_GLOBAL, "repository", None), ("GET", "/version", ResourceType.MISC_GLOBAL, "repository", None), ("DELETE", "/admin/users/bob", ResourceType.ADMIN, "repository", None), ], ) def test_classify_raw_endpoint( method: str, path: str, rtype: ResourceType, ident_field: str, ident_value: str | None ) -> None: result = classify_raw_endpoint(method, _endpoint(path)) assert result.resource_type is rtype assert getattr(result, ident_field) == ident_value def test_classify_tool_maps_typed_tools() -> None: assert classify_tool("list_org_repositories", {"org": "acme"}).resource_type is ResourceType.ORG assert classify_tool("list_org_repositories", {"org": "acme"}).org == "acme" assert classify_tool("list_organizations", {}).resource_type is ResourceType.USER_SELF # An unrecognized non-repo tool is UNKNOWN (deny). assert classify_tool("something_new", {}).resource_type is ResourceType.UNKNOWN def test_classify_tool_gitea_request_uses_path() -> None: cls = classify_tool("gitea_request", {"method": "GET", "path": "/orgs/acme/repos"}) assert cls.resource_type is ResourceType.ORG assert cls.org == "acme" def test_classify_tool_gitea_request_traversal_is_unknown() -> None: cls = classify_tool("gitea_request", {"method": "GET", "path": "/repos/../../admin"}) assert cls.resource_type is ResourceType.UNKNOWN # --- Decision matrix (verification mocked) ---------------------------------- async def test_org_member_allowed(authz_env: None, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr(authz, "verify_org_membership", AsyncMock(return_value=True)) cls = ResourceClass(ResourceType.ORG, is_write=False, org="acme") await authorize_non_repository_access( classification=cls, user_login="alice", tool_name="gitea_request" ) async def test_org_nonmember_denied(authz_env: None, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr(authz, "verify_org_membership", AsyncMock(return_value=False)) cls = ResourceClass(ResourceType.ORG, is_write=False, org="acme") with pytest.raises(ToolError) as exc_info: await authorize_non_repository_access( classification=cls, user_login="alice", tool_name="gitea_request" ) assert exc_info.value.status_code == 403 async def test_user_owned_self_allowed(authz_env: None, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr(authz, "verify_org_membership", AsyncMock(return_value=False)) cls = ResourceClass(ResourceType.USER_OWNED, is_write=False, owner="alice") await authorize_non_repository_access( classification=cls, user_login="alice", tool_name="gitea_request" ) async def test_user_owned_member_org_allowed( authz_env: None, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.setattr(authz, "verify_org_membership", AsyncMock(return_value=True)) cls = ResourceClass(ResourceType.USER_OWNED, is_write=False, owner="acme") await authorize_non_repository_access( classification=cls, user_login="alice", tool_name="gitea_request" ) async def test_user_owned_other_denied(authz_env: None, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr(authz, "verify_org_membership", AsyncMock(return_value=False)) cls = ResourceClass(ResourceType.USER_OWNED, is_write=False, owner="bob") with pytest.raises(ToolError): await authorize_non_repository_access( classification=cls, user_login="alice", tool_name="gitea_request" ) async def test_user_self_denied_in_service_pat_mode(authz_env: None) -> None: cls = ResourceClass(ResourceType.USER_SELF, is_write=False) with pytest.raises(ToolError) as exc_info: await authorize_non_repository_access( classification=cls, user_login="alice", tool_name="gitea_request" ) assert "token-owner-scoped" in str(exc_info.value.detail) async def test_misc_global_read_allowed_write_denied(authz_env: None) -> None: read_cls = ResourceClass(ResourceType.MISC_GLOBAL, is_write=False) await authorize_non_repository_access( classification=read_cls, user_login="alice", tool_name="gitea_request" ) write_cls = ResourceClass(ResourceType.MISC_GLOBAL, is_write=True) with pytest.raises(ToolError): await authorize_non_repository_access( classification=write_cls, user_login="alice", tool_name="gitea_request" ) async def test_admin_denied_without_opt_in(authz_env: None) -> None: cls = ResourceClass(ResourceType.ADMIN, is_write=True) with pytest.raises(ToolError) as exc_info: await authorize_non_repository_access( classification=cls, user_login="alice", tool_name="gitea_request" ) assert "RAW_API_ALLOW_SENSITIVE" in str(exc_info.value.detail) async def test_admin_allowed_only_for_site_admin_with_opt_in( authz_env: None, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.setenv("RAW_API_ALLOW_SENSITIVE", "true") reset_settings() monkeypatch.setattr(authz, "verify_site_admin", AsyncMock(return_value=True)) cls = ResourceClass(ResourceType.ADMIN, is_write=True) await authorize_non_repository_access( classification=cls, user_login="root", tool_name="gitea_request" ) monkeypatch.setattr(authz, "verify_site_admin", AsyncMock(return_value=False)) with pytest.raises(ToolError): await authorize_non_repository_access( classification=cls, user_login="alice", tool_name="gitea_request" ) async def test_unknown_resource_denied(authz_env: None) -> None: cls = ResourceClass(ResourceType.UNKNOWN, is_write=False) with pytest.raises(ToolError): await authorize_non_repository_access( classification=cls, user_login="alice", tool_name="gitea_request" ) async def test_repository_without_target_denied(authz_env: None) -> None: """A repo-typed call that could not be scoped to owner/repo fails closed.""" cls = ResourceClass(ResourceType.REPOSITORY, is_write=False, repository=None) with pytest.raises(ToolError): await authorize_non_repository_access( classification=cls, user_login="alice", tool_name="gitea_request" ) # --- Gitea verification helpers (fail-closed) ------------------------------- def _patch_service_response(status_code: int, json_value: object = None) -> Any: response = MagicMock() response.status_code = status_code response.json.return_value = json_value return response def _patched_client(response: object) -> Any: patcher = patch("aegis_gitea_mcp.authz.httpx.AsyncClient") mock_client_cls = patcher.start() mock_client = AsyncMock() mock_client.get = AsyncMock(return_value=response) mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) return patcher async def test_verify_org_membership_204_true(authz_env: None) -> None: patcher = _patched_client(_patch_service_response(204)) try: assert await verify_org_membership(org="acme", user_login="alice") is True finally: patcher.stop() async def test_verify_org_membership_404_false(authz_env: None) -> None: patcher = _patched_client(_patch_service_response(404)) try: assert await verify_org_membership(org="acme", user_login="alice") is False finally: patcher.stop() async def test_verify_org_membership_unknown_user_false(authz_env: None) -> None: assert await verify_org_membership(org="acme", user_login="unknown") is False async def test_verify_site_admin_true_only_when_flag_set(authz_env: None) -> None: patcher = _patched_client(_patch_service_response(200, {"is_admin": True})) try: assert await verify_site_admin(user_login="root") is True finally: patcher.stop() async def test_verify_site_admin_false_when_flag_absent(authz_env: None) -> None: patcher = _patched_client(_patch_service_response(200, {"is_admin": False})) try: assert await verify_site_admin(user_login="alice") is False finally: patcher.stop() async def test_verify_site_admin_non_200_false(authz_env: None) -> None: patcher = _patched_client(_patch_service_response(403, {})) try: assert await verify_site_admin(user_login="alice") is False finally: patcher.stop()