Files
AegisGitea-MCP/tests/test_authz.py
T
Latte 3392d8f69b feat(security): resource-type-aware authorization with fail-closed defaults
Add aegis_gitea_mcp.authz: classify every dispatched call (typed tools and
gitea_request) by resource type (repository/org/user_self/user_owned/
misc_global/admin/unknown) and enforce a type-specific rule in service-PAT
mode, on top of policy + WRITE_MODE. Every decision fails closed:

- org: signed-in user must be a verified org member (Gitea-checked).
- user_owned: owner must be the caller or a member org of the caller.
- user_self: token-owner-scoped endpoints denied (token is the bot's).
- admin: default-deny; allowed only with RAW_API_ALLOW_SENSITIVE opt-in AND a
  verified site admin.
- misc_global: reads allowed, writes denied.
- unknown / unverifiable: denied and audited.

Wire it into the server's service-PAT dispatch: repository calls keep the
existing per-user collaborator check; non-repo calls (previously blanket-denied)
now go through the resource-type gate, opening the org/user/admin surface
safely. Verification results are cached briefly (fail-closed: positives only).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 11:09:50 +02:00

262 lines
10 KiB
Python

"""Tests for resource-type-aware authorization (fail-closed)."""
from __future__ import annotations
from pathlib import Path
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from aegis_gitea_mcp import authz
from aegis_gitea_mcp.authz import (
ResourceClass,
ResourceType,
authorize_non_repository_access,
classify_raw_endpoint,
classify_tool,
verify_org_membership,
verify_site_admin,
)
from aegis_gitea_mcp.config import reset_settings
from aegis_gitea_mcp.errors import ToolError
from aegis_gitea_mcp.tools.arguments import normalize_raw_endpoint
@pytest.fixture
def authz_env(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""Service-PAT-mode settings used by the authorization layer."""
reset_settings()
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
monkeypatch.setenv("GITEA_TOKEN", "service-pat-token")
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
monkeypatch.setenv("ENVIRONMENT", "test")
monkeypatch.setenv("POLICY_FILE_PATH", str(tmp_path / "missing-policy.yaml"))
def _endpoint(path: str) -> str:
return normalize_raw_endpoint(path)
# --- Classification ---------------------------------------------------------
@pytest.mark.parametrize(
("method", "path", "rtype", "ident_field", "ident_value"),
[
("GET", "/repos/acme/app/pulls/1", ResourceType.REPOSITORY, "repository", "acme/app"),
("GET", "/repos/issues/search", ResourceType.REPOSITORY, "repository", None),
("GET", "/orgs/acme/repos", ResourceType.ORG, "org", "acme"),
("GET", "/users/bob/repos", ResourceType.USER_OWNED, "owner", "bob"),
("GET", "/packages/bob/pypi", ResourceType.USER_OWNED, "owner", "bob"),
("GET", "/user/repos", ResourceType.USER_SELF, "repository", None),
("GET", "/notifications", ResourceType.USER_SELF, "repository", None),
("GET", "/markdown", ResourceType.MISC_GLOBAL, "repository", None),
("GET", "/version", ResourceType.MISC_GLOBAL, "repository", None),
("DELETE", "/admin/users/bob", ResourceType.ADMIN, "repository", None),
],
)
def test_classify_raw_endpoint(
method: str, path: str, rtype: ResourceType, ident_field: str, ident_value: str | None
) -> None:
result = classify_raw_endpoint(method, _endpoint(path))
assert result.resource_type is rtype
assert getattr(result, ident_field) == ident_value
def test_classify_tool_maps_typed_tools() -> None:
assert classify_tool("list_org_repositories", {"org": "acme"}).resource_type is ResourceType.ORG
assert classify_tool("list_org_repositories", {"org": "acme"}).org == "acme"
assert classify_tool("list_organizations", {}).resource_type is ResourceType.USER_SELF
# An unrecognized non-repo tool is UNKNOWN (deny).
assert classify_tool("something_new", {}).resource_type is ResourceType.UNKNOWN
def test_classify_tool_gitea_request_uses_path() -> None:
cls = classify_tool("gitea_request", {"method": "GET", "path": "/orgs/acme/repos"})
assert cls.resource_type is ResourceType.ORG
assert cls.org == "acme"
def test_classify_tool_gitea_request_traversal_is_unknown() -> None:
cls = classify_tool("gitea_request", {"method": "GET", "path": "/repos/../../admin"})
assert cls.resource_type is ResourceType.UNKNOWN
# --- Decision matrix (verification mocked) ----------------------------------
async def test_org_member_allowed(authz_env: None, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(authz, "verify_org_membership", AsyncMock(return_value=True))
cls = ResourceClass(ResourceType.ORG, is_write=False, org="acme")
await authorize_non_repository_access(
classification=cls, user_login="alice", tool_name="gitea_request"
)
async def test_org_nonmember_denied(authz_env: None, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(authz, "verify_org_membership", AsyncMock(return_value=False))
cls = ResourceClass(ResourceType.ORG, is_write=False, org="acme")
with pytest.raises(ToolError) as exc_info:
await authorize_non_repository_access(
classification=cls, user_login="alice", tool_name="gitea_request"
)
assert exc_info.value.status_code == 403
async def test_user_owned_self_allowed(authz_env: None, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(authz, "verify_org_membership", AsyncMock(return_value=False))
cls = ResourceClass(ResourceType.USER_OWNED, is_write=False, owner="alice")
await authorize_non_repository_access(
classification=cls, user_login="alice", tool_name="gitea_request"
)
async def test_user_owned_member_org_allowed(
authz_env: None, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.setattr(authz, "verify_org_membership", AsyncMock(return_value=True))
cls = ResourceClass(ResourceType.USER_OWNED, is_write=False, owner="acme")
await authorize_non_repository_access(
classification=cls, user_login="alice", tool_name="gitea_request"
)
async def test_user_owned_other_denied(authz_env: None, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(authz, "verify_org_membership", AsyncMock(return_value=False))
cls = ResourceClass(ResourceType.USER_OWNED, is_write=False, owner="bob")
with pytest.raises(ToolError):
await authorize_non_repository_access(
classification=cls, user_login="alice", tool_name="gitea_request"
)
async def test_user_self_denied_in_service_pat_mode(authz_env: None) -> None:
cls = ResourceClass(ResourceType.USER_SELF, is_write=False)
with pytest.raises(ToolError) as exc_info:
await authorize_non_repository_access(
classification=cls, user_login="alice", tool_name="gitea_request"
)
assert "token-owner-scoped" in str(exc_info.value.detail)
async def test_misc_global_read_allowed_write_denied(authz_env: None) -> None:
read_cls = ResourceClass(ResourceType.MISC_GLOBAL, is_write=False)
await authorize_non_repository_access(
classification=read_cls, user_login="alice", tool_name="gitea_request"
)
write_cls = ResourceClass(ResourceType.MISC_GLOBAL, is_write=True)
with pytest.raises(ToolError):
await authorize_non_repository_access(
classification=write_cls, user_login="alice", tool_name="gitea_request"
)
async def test_admin_denied_without_opt_in(authz_env: None) -> None:
cls = ResourceClass(ResourceType.ADMIN, is_write=True)
with pytest.raises(ToolError) as exc_info:
await authorize_non_repository_access(
classification=cls, user_login="alice", tool_name="gitea_request"
)
assert "RAW_API_ALLOW_SENSITIVE" in str(exc_info.value.detail)
async def test_admin_allowed_only_for_site_admin_with_opt_in(
authz_env: None, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.setenv("RAW_API_ALLOW_SENSITIVE", "true")
reset_settings()
monkeypatch.setattr(authz, "verify_site_admin", AsyncMock(return_value=True))
cls = ResourceClass(ResourceType.ADMIN, is_write=True)
await authorize_non_repository_access(
classification=cls, user_login="root", tool_name="gitea_request"
)
monkeypatch.setattr(authz, "verify_site_admin", AsyncMock(return_value=False))
with pytest.raises(ToolError):
await authorize_non_repository_access(
classification=cls, user_login="alice", tool_name="gitea_request"
)
async def test_unknown_resource_denied(authz_env: None) -> None:
cls = ResourceClass(ResourceType.UNKNOWN, is_write=False)
with pytest.raises(ToolError):
await authorize_non_repository_access(
classification=cls, user_login="alice", tool_name="gitea_request"
)
async def test_repository_without_target_denied(authz_env: None) -> None:
"""A repo-typed call that could not be scoped to owner/repo fails closed."""
cls = ResourceClass(ResourceType.REPOSITORY, is_write=False, repository=None)
with pytest.raises(ToolError):
await authorize_non_repository_access(
classification=cls, user_login="alice", tool_name="gitea_request"
)
# --- Gitea verification helpers (fail-closed) -------------------------------
def _patch_service_response(status_code: int, json_value: object = None) -> Any:
response = MagicMock()
response.status_code = status_code
response.json.return_value = json_value
return response
def _patched_client(response: object) -> Any:
patcher = patch("aegis_gitea_mcp.authz.httpx.AsyncClient")
mock_client_cls = patcher.start()
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=response)
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
return patcher
async def test_verify_org_membership_204_true(authz_env: None) -> None:
patcher = _patched_client(_patch_service_response(204))
try:
assert await verify_org_membership(org="acme", user_login="alice") is True
finally:
patcher.stop()
async def test_verify_org_membership_404_false(authz_env: None) -> None:
patcher = _patched_client(_patch_service_response(404))
try:
assert await verify_org_membership(org="acme", user_login="alice") is False
finally:
patcher.stop()
async def test_verify_org_membership_unknown_user_false(authz_env: None) -> None:
assert await verify_org_membership(org="acme", user_login="unknown") is False
async def test_verify_site_admin_true_only_when_flag_set(authz_env: None) -> None:
patcher = _patched_client(_patch_service_response(200, {"is_admin": True}))
try:
assert await verify_site_admin(user_login="root") is True
finally:
patcher.stop()
async def test_verify_site_admin_false_when_flag_absent(authz_env: None) -> None:
patcher = _patched_client(_patch_service_response(200, {"is_admin": False}))
try:
assert await verify_site_admin(user_login="alice") is False
finally:
patcher.stop()
async def test_verify_site_admin_non_200_false(authz_env: None) -> None:
patcher = _patched_client(_patch_service_response(403, {}))
try:
assert await verify_site_admin(user_login="alice") is False
finally:
patcher.stop()