diff --git a/src/aegis_gitea_mcp/authz.py b/src/aegis_gitea_mcp/authz.py new file mode 100644 index 0000000..ff82442 --- /dev/null +++ b/src/aegis_gitea_mcp/authz.py @@ -0,0 +1,342 @@ +"""Resource-type-aware authorization (fail-closed). + +The public HTTP server runs in *service-PAT mode*: a privileged bot token makes +the actual Gitea calls while a per-user OAuth identity decides what that user is +allowed to reach. For repository-scoped calls the server verifies the user's +collaborator permission on ``owner/repo``. This module closes the rest of the +gap — the admin/user/org/misc surface that ``gitea_request`` can now reach — by +classifying each call by *resource type* and enforcing a type-specific rule. + +Every decision fails closed: if a call cannot be classified, or a required +permission cannot be positively verified against Gitea, it is denied and audited. + +Rules (enforced only in service-PAT mode; in pure-OAuth mode the user's own +token already scopes every call at Gitea): + +* ``repository`` — per-user collaborator permission (handled by the server's + existing repository check; not re-implemented here). +* ``org`` — the signed-in user must be a verified member of the target org. +* ``user_self`` — token-owner-scoped endpoints (``/user``, ``/notifications``). + Denied in service-PAT mode: the data belongs to the bot, not the caller. +* ``user_owned`` — a resource owned by a named user/org (``/users/{name}``, + ``/packages/{owner}``). Allowed only when the owner is the caller, or the + caller is a verified member of the owning org. +* ``misc_global`` — instance-wide, read-only utility endpoints (markdown render, + version, gitignore templates …). Reads allowed; writes fall to policy. +* ``admin`` — default deny. Allowed only when the operator has opted in + (``RAW_API_ALLOW_SENSITIVE``) *and* the signed-in user is a verified Gitea + site administrator. +* ``unknown`` — denied. +""" + +from __future__ import annotations + +import urllib.parse +from dataclasses import dataclass +from enum import Enum + +import httpx + +from aegis_gitea_mcp.audit import get_audit_logger +from aegis_gitea_mcp.cache import BoundedTTLCache +from aegis_gitea_mcp.config import get_settings +from aegis_gitea_mcp.errors import ToolError +from aegis_gitea_mcp.tools.arguments import ( + normalize_raw_endpoint, + parse_raw_repository, + raw_relative_segments, + raw_request_is_write, +) + + +class ResourceType(str, Enum): + """Coarse resource classes used for authorization decisions.""" + + REPOSITORY = "repository" + ORG = "org" + USER_SELF = "user_self" + USER_OWNED = "user_owned" + MISC_GLOBAL = "misc_global" + ADMIN = "admin" + UNKNOWN = "unknown" + + +@dataclass(frozen=True) +class ResourceClass: + """Result of classifying a call by resource type.""" + + resource_type: ResourceType + is_write: bool + repository: str | None = None + org: str | None = None + owner: str | None = None + + +# Instance-wide, read-only utility prefixes: not owned by any user/org. +_MISC_GLOBAL_PREFIXES = frozenset( + { + "markdown", + "markup", + "version", + "gitignore", + "licenses", + "label", + "topics", + "nodeinfo", + "activitypub", + "miscellaneous", + "signing-key.gpg", + "settings", + } +) + +# Token-owner-scoped prefixes ("me"/"my" endpoints). +_USER_SELF_PREFIXES = frozenset({"user", "notifications"}) + + +def classify_raw_endpoint(method: str, endpoint: str) -> ResourceClass: + """Classify a normalized raw ``/api/v1`` endpoint by resource type. + + Args: + method: HTTP method (used only to set the read/write flag). + endpoint: A normalized ``/api/v1/...`` path. + + Returns: + The resource classification; ``UNKNOWN`` when nothing matches (deny). + """ + is_write = raw_request_is_write(method, endpoint) + rel = raw_relative_segments(endpoint) + if not rel: + return ResourceClass(ResourceType.MISC_GLOBAL, is_write) + + top = rel[0] + + if top == "admin": + return ResourceClass(ResourceType.ADMIN, is_write) + + if top in {"repos", "repositories"}: + repository = parse_raw_repository(endpoint) + # repository is None for cross-repo endpoints (search/issues) — those + # cannot be scoped to a single owner/repo and so fail closed downstream. + return ResourceClass(ResourceType.REPOSITORY, is_write, repository=repository) + + if top in {"orgs", "org"}: + org = rel[1] if len(rel) >= 2 else None + return ResourceClass(ResourceType.ORG, is_write, org=org) + + if top == "users": + owner = rel[1] if len(rel) >= 2 else None + return ResourceClass(ResourceType.USER_OWNED, is_write, owner=owner) + + if top == "packages": + owner = rel[1] if len(rel) >= 2 else None + return ResourceClass(ResourceType.USER_OWNED, is_write, owner=owner) + + if top in _USER_SELF_PREFIXES: + return ResourceClass(ResourceType.USER_SELF, is_write) + + if top in _MISC_GLOBAL_PREFIXES: + return ResourceClass(ResourceType.MISC_GLOBAL, is_write) + + return ResourceClass(ResourceType.UNKNOWN, is_write) + + +def classify_tool(tool_name: str, arguments: dict[str, object]) -> ResourceClass: + """Classify a dispatched tool call (typed tool or ``gitea_request``). + + Repository-scoped typed tools are handled by the server's repository check, + so this primarily classifies the non-repo surface that this module gates. + """ + if tool_name == "gitea_request": + method = str(arguments.get("method", "GET")) + path = str(arguments.get("path", "")) + try: + endpoint = normalize_raw_endpoint(path) + except ValueError: + return ResourceClass(ResourceType.UNKNOWN, is_write=True) + return classify_raw_endpoint(method, endpoint) + + if tool_name == "list_org_repositories": + org = arguments.get("org") + return ResourceClass( + ResourceType.ORG, is_write=False, org=org if isinstance(org, str) else None + ) + + if tool_name == "list_organizations": + # Backed by /user/orgs: token-owner-scoped, not attributable to the caller + # in service-PAT mode. + return ResourceClass(ResourceType.USER_SELF, is_write=False) + + # Any other non-repository tool is unrecognized for the purpose of this gate. + return ResourceClass(ResourceType.UNKNOWN, is_write=False) + + +# Bounded, short-TTL caches for positive verification results (fail-closed: +# only successful checks are cached). +_org_membership_cache: BoundedTTLCache[str, bool] | None = None +_site_admin_cache: BoundedTTLCache[str, bool] | None = None + + +def _get_org_membership_cache() -> BoundedTTLCache[str, bool]: + global _org_membership_cache + if _org_membership_cache is None: + ttl = get_settings().repo_authz_cache_ttl_seconds + _org_membership_cache = BoundedTTLCache(ttl_seconds=ttl, max_size=2048) + return _org_membership_cache + + +def _get_site_admin_cache() -> BoundedTTLCache[str, bool]: + global _site_admin_cache + if _site_admin_cache is None: + ttl = get_settings().repo_authz_cache_ttl_seconds + _site_admin_cache = BoundedTTLCache(ttl_seconds=ttl, max_size=2048) + return _site_admin_cache + + +def reset_authz_caches() -> None: + """Reset authorization caches (primarily for tests).""" + global _org_membership_cache, _site_admin_cache + _org_membership_cache = None + _site_admin_cache = None + + +async def _service_get(path: str) -> httpx.Response | None: + """GET ``path`` on Gitea with the service PAT; None on transport failure.""" + settings = get_settings() + token = settings.gitea_token.strip() + if not token: + return None + url = f"{settings.gitea_base_url}{path}" + try: + async with httpx.AsyncClient(timeout=settings.request_timeout_seconds) as client: + return await client.get( + url, + headers={"Authorization": f"token {token}", "Accept": "application/json"}, + ) + except httpx.RequestError: + return None + + +async def verify_org_membership(*, org: str, user_login: str) -> bool: + """Return True only if ``user_login`` is a verified member of ``org``. + + Fails closed: any transport error, non-204 response, or missing identity + yields False. + """ + if not org or not user_login or user_login == "unknown": + return False + cache_key = f"{org.lower()}:{user_login.lower()}" + cache = _get_org_membership_cache() + if cache.get(cache_key) is True: + return True + + encoded_org = urllib.parse.quote(org, safe="") + encoded_user = urllib.parse.quote(user_login, safe="") + response = await _service_get(f"/api/v1/orgs/{encoded_org}/members/{encoded_user}") + if response is not None and response.status_code == 204: + cache.set(cache_key, True) + return True + return False + + +async def verify_site_admin(*, user_login: str) -> bool: + """Return True only if ``user_login`` is a verified Gitea site administrator. + + Requires the service PAT to have admin visibility (so ``is_admin`` is + returned). Fails closed on any error or when the flag is not positively True. + """ + if not user_login or user_login == "unknown": + return False + cache_key = user_login.lower() + cache = _get_site_admin_cache() + if cache.get(cache_key) is True: + return True + + encoded_user = urllib.parse.quote(user_login, safe="") + response = await _service_get(f"/api/v1/users/{encoded_user}") + if response is None or response.status_code != 200: + return False + try: + payload = response.json() + except ValueError: + return False + if isinstance(payload, dict) and payload.get("is_admin") is True: + cache.set(cache_key, True) + return True + return False + + +async def authorize_non_repository_access( + *, + classification: ResourceClass, + user_login: str, + tool_name: str, + correlation_id: str | None = None, +) -> None: + """Enforce the resource-type rule for a non-repository call (service-PAT mode). + + Raises: + ToolError: with status 403 when the call is denied. The repository type + is intentionally not handled here — the server's existing per-user + collaborator check owns it. + """ + audit = get_audit_logger() + settings = get_settings() + login = (user_login or "").strip() + + def _deny(reason: str) -> ToolError: + audit.log_access_denied( + tool_name=tool_name, + repository=classification.repository, + reason=f"resource_authz:{classification.resource_type.value}:{reason}", + correlation_id=correlation_id, + ) + return ToolError( + f"Access denied for {classification.resource_type.value} resource: {reason}", + status_code=403, + ) + + rtype = classification.resource_type + + if rtype == ResourceType.REPOSITORY: + # Reached only when a repo-scoped path could not be parsed to owner/repo + # (e.g. cross-repo search). Cannot verify per-user permission -> deny. + raise _deny("repository could not be determined") + + if rtype == ResourceType.ORG: + if not classification.org: + raise _deny("organization not specified") + if await verify_org_membership(org=classification.org, user_login=login): + return + raise _deny("user is not a verified member of the organization") + + if rtype == ResourceType.USER_OWNED: + owner = (classification.owner or "").strip() + if not owner: + raise _deny("resource owner not specified") + if owner.lower() == login.lower() and login: + return + # The owner may be an organization the caller belongs to. + if await verify_org_membership(org=owner, user_login=login): + return + raise _deny("resource owner is neither the caller nor a member org") + + if rtype == ResourceType.USER_SELF: + # Token-owner-scoped data; in service-PAT mode the token is the bot's, so + # the result cannot be attributed to the caller. + raise _deny("token-owner-scoped endpoint is not available in service-PAT mode") + + if rtype == ResourceType.MISC_GLOBAL: + if not classification.is_write: + return + # Writes to global utility endpoints are not part of the safe surface. + raise _deny("write to a global endpoint is not permitted") + + if rtype == ResourceType.ADMIN: + if not settings.raw_api_allow_sensitive: + raise _deny("admin surface is disabled (set RAW_API_ALLOW_SENSITIVE=true to opt in)") + if await verify_site_admin(user_login=login): + return + raise _deny("user is not a verified site administrator") + + raise _deny("unclassified resource") diff --git a/src/aegis_gitea_mcp/server.py b/src/aegis_gitea_mcp/server.py index 3831303..b266580 100644 --- a/src/aegis_gitea_mcp/server.py +++ b/src/aegis_gitea_mcp/server.py @@ -19,6 +19,7 @@ from fastapi.responses import JSONResponse, PlainTextResponse, RedirectResponse, from pydantic import BaseModel, Field, ValidationError from aegis_gitea_mcp.audit import get_audit_logger +from aegis_gitea_mcp.authz import authorize_non_repository_access, classify_tool from aegis_gitea_mcp.automation import AutomationError, AutomationManager from aegis_gitea_mcp.cache import BoundedTTLCache from aegis_gitea_mcp.config import get_settings @@ -1143,33 +1144,34 @@ async def _execute_tool_call( raise HTTPException(status_code=401, detail="Missing authenticated user token context") if settings.gitea_token.strip(): - if not repository: - # list_repositories is not repo-scoped; the handler scopes it to - # the authenticated user's own repositories instead. Every other - # tool requires a repository target so per-user permission can be - # verified before the privileged service PAT is used. - if tool_name != "list_repositories": - audit.log_access_denied( - tool_name=tool_name, - reason="service_pat_requires_repository_target", - correlation_id=correlation_id, - ) - raise HTTPException( - status_code=403, - detail=( - "Service PAT mode requires a repository target so per-user " - "permission can be verified." - ), - ) - else: - user_login = get_gitea_user_login() + user_login = get_gitea_user_login() or "" + if repository: + # Repository-scoped: verify the signed-in user's collaborator + # permission before the privileged service PAT is used. await _verify_user_repository_access( repository=repository, required_scope=required_scope, - user_login=user_login or "", + user_login=user_login, correlation_id=correlation_id, tool_name=tool_name, ) + elif tool_name == "list_repositories": + # Not repo-scoped; the handler scopes it to the authenticated + # user's own repositories. + pass + else: + # Non-repository call (org/user/admin/misc, incl. gitea_request): + # classify by resource type and enforce the fail-closed rule. + classification = classify_tool(tool_name, arguments) + try: + await authorize_non_repository_access( + classification=classification, + user_login=user_login, + tool_name=tool_name, + correlation_id=correlation_id, + ) + except ToolError as exc: + raise HTTPException(status_code=exc.status_code, detail=exc.detail) from exc # In OAuth mode, Gitea OIDC access_tokens can't call the Gitea REST API # (they only carry OIDC scopes). If a service PAT is configured via diff --git a/tests/conftest.py b/tests/conftest.py index 00d6374..4eba9d3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,6 +7,7 @@ import pytest from aegis_gitea_mcp.audit import reset_audit_logger from aegis_gitea_mcp.auth import reset_validator +from aegis_gitea_mcp.authz import reset_authz_caches from aegis_gitea_mcp.config import reset_settings from aegis_gitea_mcp.oauth import reset_oauth_validator from aegis_gitea_mcp.oauth_flow import reset_oauth_client_registry @@ -27,6 +28,7 @@ def reset_globals(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Generator[ reset_oauth_validator() reset_oauth_client_registry() reset_repo_authz_cache() + reset_authz_caches() reset_policy_engine() reset_rate_limiter() reset_metrics_registry() @@ -45,6 +47,7 @@ def reset_globals(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Generator[ reset_oauth_validator() reset_oauth_client_registry() reset_repo_authz_cache() + reset_authz_caches() reset_policy_engine() reset_rate_limiter() reset_metrics_registry() diff --git a/tests/test_authz.py b/tests/test_authz.py new file mode 100644 index 0000000..e5ca920 --- /dev/null +++ b/tests/test_authz.py @@ -0,0 +1,261 @@ +"""Tests for resource-type-aware authorization (fail-closed).""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from aegis_gitea_mcp import authz +from aegis_gitea_mcp.authz import ( + ResourceClass, + ResourceType, + authorize_non_repository_access, + classify_raw_endpoint, + classify_tool, + verify_org_membership, + verify_site_admin, +) +from aegis_gitea_mcp.config import reset_settings +from aegis_gitea_mcp.errors import ToolError +from aegis_gitea_mcp.tools.arguments import normalize_raw_endpoint + + +@pytest.fixture +def authz_env(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + """Service-PAT-mode settings used by the authorization layer.""" + reset_settings() + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("GITEA_TOKEN", "service-pat-token") + monkeypatch.setenv("MCP_API_KEYS", "a" * 64) + monkeypatch.setenv("ENVIRONMENT", "test") + monkeypatch.setenv("POLICY_FILE_PATH", str(tmp_path / "missing-policy.yaml")) + + +def _endpoint(path: str) -> str: + return normalize_raw_endpoint(path) + + +# --- Classification --------------------------------------------------------- + + +@pytest.mark.parametrize( + ("method", "path", "rtype", "ident_field", "ident_value"), + [ + ("GET", "/repos/acme/app/pulls/1", ResourceType.REPOSITORY, "repository", "acme/app"), + ("GET", "/repos/issues/search", ResourceType.REPOSITORY, "repository", None), + ("GET", "/orgs/acme/repos", ResourceType.ORG, "org", "acme"), + ("GET", "/users/bob/repos", ResourceType.USER_OWNED, "owner", "bob"), + ("GET", "/packages/bob/pypi", ResourceType.USER_OWNED, "owner", "bob"), + ("GET", "/user/repos", ResourceType.USER_SELF, "repository", None), + ("GET", "/notifications", ResourceType.USER_SELF, "repository", None), + ("GET", "/markdown", ResourceType.MISC_GLOBAL, "repository", None), + ("GET", "/version", ResourceType.MISC_GLOBAL, "repository", None), + ("DELETE", "/admin/users/bob", ResourceType.ADMIN, "repository", None), + ], +) +def test_classify_raw_endpoint( + method: str, path: str, rtype: ResourceType, ident_field: str, ident_value: str | None +) -> None: + result = classify_raw_endpoint(method, _endpoint(path)) + assert result.resource_type is rtype + assert getattr(result, ident_field) == ident_value + + +def test_classify_tool_maps_typed_tools() -> None: + assert classify_tool("list_org_repositories", {"org": "acme"}).resource_type is ResourceType.ORG + assert classify_tool("list_org_repositories", {"org": "acme"}).org == "acme" + assert classify_tool("list_organizations", {}).resource_type is ResourceType.USER_SELF + # An unrecognized non-repo tool is UNKNOWN (deny). + assert classify_tool("something_new", {}).resource_type is ResourceType.UNKNOWN + + +def test_classify_tool_gitea_request_uses_path() -> None: + cls = classify_tool("gitea_request", {"method": "GET", "path": "/orgs/acme/repos"}) + assert cls.resource_type is ResourceType.ORG + assert cls.org == "acme" + + +def test_classify_tool_gitea_request_traversal_is_unknown() -> None: + cls = classify_tool("gitea_request", {"method": "GET", "path": "/repos/../../admin"}) + assert cls.resource_type is ResourceType.UNKNOWN + + +# --- Decision matrix (verification mocked) ---------------------------------- + + +async def test_org_member_allowed(authz_env: None, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(authz, "verify_org_membership", AsyncMock(return_value=True)) + cls = ResourceClass(ResourceType.ORG, is_write=False, org="acme") + await authorize_non_repository_access( + classification=cls, user_login="alice", tool_name="gitea_request" + ) + + +async def test_org_nonmember_denied(authz_env: None, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(authz, "verify_org_membership", AsyncMock(return_value=False)) + cls = ResourceClass(ResourceType.ORG, is_write=False, org="acme") + with pytest.raises(ToolError) as exc_info: + await authorize_non_repository_access( + classification=cls, user_login="alice", tool_name="gitea_request" + ) + assert exc_info.value.status_code == 403 + + +async def test_user_owned_self_allowed(authz_env: None, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(authz, "verify_org_membership", AsyncMock(return_value=False)) + cls = ResourceClass(ResourceType.USER_OWNED, is_write=False, owner="alice") + await authorize_non_repository_access( + classification=cls, user_login="alice", tool_name="gitea_request" + ) + + +async def test_user_owned_member_org_allowed( + authz_env: None, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setattr(authz, "verify_org_membership", AsyncMock(return_value=True)) + cls = ResourceClass(ResourceType.USER_OWNED, is_write=False, owner="acme") + await authorize_non_repository_access( + classification=cls, user_login="alice", tool_name="gitea_request" + ) + + +async def test_user_owned_other_denied(authz_env: None, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(authz, "verify_org_membership", AsyncMock(return_value=False)) + cls = ResourceClass(ResourceType.USER_OWNED, is_write=False, owner="bob") + with pytest.raises(ToolError): + await authorize_non_repository_access( + classification=cls, user_login="alice", tool_name="gitea_request" + ) + + +async def test_user_self_denied_in_service_pat_mode(authz_env: None) -> None: + cls = ResourceClass(ResourceType.USER_SELF, is_write=False) + with pytest.raises(ToolError) as exc_info: + await authorize_non_repository_access( + classification=cls, user_login="alice", tool_name="gitea_request" + ) + assert "token-owner-scoped" in str(exc_info.value.detail) + + +async def test_misc_global_read_allowed_write_denied(authz_env: None) -> None: + read_cls = ResourceClass(ResourceType.MISC_GLOBAL, is_write=False) + await authorize_non_repository_access( + classification=read_cls, user_login="alice", tool_name="gitea_request" + ) + write_cls = ResourceClass(ResourceType.MISC_GLOBAL, is_write=True) + with pytest.raises(ToolError): + await authorize_non_repository_access( + classification=write_cls, user_login="alice", tool_name="gitea_request" + ) + + +async def test_admin_denied_without_opt_in(authz_env: None) -> None: + cls = ResourceClass(ResourceType.ADMIN, is_write=True) + with pytest.raises(ToolError) as exc_info: + await authorize_non_repository_access( + classification=cls, user_login="alice", tool_name="gitea_request" + ) + assert "RAW_API_ALLOW_SENSITIVE" in str(exc_info.value.detail) + + +async def test_admin_allowed_only_for_site_admin_with_opt_in( + authz_env: None, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setenv("RAW_API_ALLOW_SENSITIVE", "true") + reset_settings() + + monkeypatch.setattr(authz, "verify_site_admin", AsyncMock(return_value=True)) + cls = ResourceClass(ResourceType.ADMIN, is_write=True) + await authorize_non_repository_access( + classification=cls, user_login="root", tool_name="gitea_request" + ) + + monkeypatch.setattr(authz, "verify_site_admin", AsyncMock(return_value=False)) + with pytest.raises(ToolError): + await authorize_non_repository_access( + classification=cls, user_login="alice", tool_name="gitea_request" + ) + + +async def test_unknown_resource_denied(authz_env: None) -> None: + cls = ResourceClass(ResourceType.UNKNOWN, is_write=False) + with pytest.raises(ToolError): + await authorize_non_repository_access( + classification=cls, user_login="alice", tool_name="gitea_request" + ) + + +async def test_repository_without_target_denied(authz_env: None) -> None: + """A repo-typed call that could not be scoped to owner/repo fails closed.""" + cls = ResourceClass(ResourceType.REPOSITORY, is_write=False, repository=None) + with pytest.raises(ToolError): + await authorize_non_repository_access( + classification=cls, user_login="alice", tool_name="gitea_request" + ) + + +# --- Gitea verification helpers (fail-closed) ------------------------------- + + +def _patch_service_response(status_code: int, json_value: object = None) -> Any: + response = MagicMock() + response.status_code = status_code + response.json.return_value = json_value + return response + + +def _patched_client(response: object) -> Any: + patcher = patch("aegis_gitea_mcp.authz.httpx.AsyncClient") + mock_client_cls = patcher.start() + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=response) + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) + return patcher + + +async def test_verify_org_membership_204_true(authz_env: None) -> None: + patcher = _patched_client(_patch_service_response(204)) + try: + assert await verify_org_membership(org="acme", user_login="alice") is True + finally: + patcher.stop() + + +async def test_verify_org_membership_404_false(authz_env: None) -> None: + patcher = _patched_client(_patch_service_response(404)) + try: + assert await verify_org_membership(org="acme", user_login="alice") is False + finally: + patcher.stop() + + +async def test_verify_org_membership_unknown_user_false(authz_env: None) -> None: + assert await verify_org_membership(org="acme", user_login="unknown") is False + + +async def test_verify_site_admin_true_only_when_flag_set(authz_env: None) -> None: + patcher = _patched_client(_patch_service_response(200, {"is_admin": True})) + try: + assert await verify_site_admin(user_login="root") is True + finally: + patcher.stop() + + +async def test_verify_site_admin_false_when_flag_absent(authz_env: None) -> None: + patcher = _patched_client(_patch_service_response(200, {"is_admin": False})) + try: + assert await verify_site_admin(user_login="alice") is False + finally: + patcher.stop() + + +async def test_verify_site_admin_non_200_false(authz_env: None) -> None: + patcher = _patched_client(_patch_service_response(403, {})) + try: + assert await verify_site_admin(user_login="alice") is False + finally: + patcher.stop()