feat(security): resource-type-aware authorization with fail-closed defaults

Add aegis_gitea_mcp.authz: classify every dispatched call (typed tools and
gitea_request) by resource type (repository/org/user_self/user_owned/
misc_global/admin/unknown) and enforce a type-specific rule in service-PAT
mode, on top of policy + WRITE_MODE. Every decision fails closed:

- org: signed-in user must be a verified org member (Gitea-checked).
- user_owned: owner must be the caller or a member org of the caller.
- user_self: token-owner-scoped endpoints denied (token is the bot's).
- admin: default-deny; allowed only with RAW_API_ALLOW_SENSITIVE opt-in AND a
  verified site admin.
- misc_global: reads allowed, writes denied.
- unknown / unverifiable: denied and audited.

Wire it into the server's service-PAT dispatch: repository calls keep the
existing per-user collaborator check; non-repo calls (previously blanket-denied)
now go through the resource-type gate, opening the org/user/admin surface
safely. Verification results are cached briefly (fail-closed: positives only).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-27 11:09:50 +02:00
parent 2d7f12d0d0
commit 3392d8f69b
4 changed files with 629 additions and 21 deletions
+342
View File
@@ -0,0 +1,342 @@
"""Resource-type-aware authorization (fail-closed).
The public HTTP server runs in *service-PAT mode*: a privileged bot token makes
the actual Gitea calls while a per-user OAuth identity decides what that user is
allowed to reach. For repository-scoped calls the server verifies the user's
collaborator permission on ``owner/repo``. This module closes the rest of the
gap — the admin/user/org/misc surface that ``gitea_request`` can now reach — by
classifying each call by *resource type* and enforcing a type-specific rule.
Every decision fails closed: if a call cannot be classified, or a required
permission cannot be positively verified against Gitea, it is denied and audited.
Rules (enforced only in service-PAT mode; in pure-OAuth mode the user's own
token already scopes every call at Gitea):
* ``repository`` — per-user collaborator permission (handled by the server's
existing repository check; not re-implemented here).
* ``org`` — the signed-in user must be a verified member of the target org.
* ``user_self`` — token-owner-scoped endpoints (``/user``, ``/notifications``).
Denied in service-PAT mode: the data belongs to the bot, not the caller.
* ``user_owned`` — a resource owned by a named user/org (``/users/{name}``,
``/packages/{owner}``). Allowed only when the owner is the caller, or the
caller is a verified member of the owning org.
* ``misc_global`` — instance-wide, read-only utility endpoints (markdown render,
version, gitignore templates …). Reads allowed; writes fall to policy.
* ``admin`` — default deny. Allowed only when the operator has opted in
(``RAW_API_ALLOW_SENSITIVE``) *and* the signed-in user is a verified Gitea
site administrator.
* ``unknown`` — denied.
"""
from __future__ import annotations
import urllib.parse
from dataclasses import dataclass
from enum import Enum
import httpx
from aegis_gitea_mcp.audit import get_audit_logger
from aegis_gitea_mcp.cache import BoundedTTLCache
from aegis_gitea_mcp.config import get_settings
from aegis_gitea_mcp.errors import ToolError
from aegis_gitea_mcp.tools.arguments import (
normalize_raw_endpoint,
parse_raw_repository,
raw_relative_segments,
raw_request_is_write,
)
class ResourceType(str, Enum):
"""Coarse resource classes used for authorization decisions."""
REPOSITORY = "repository"
ORG = "org"
USER_SELF = "user_self"
USER_OWNED = "user_owned"
MISC_GLOBAL = "misc_global"
ADMIN = "admin"
UNKNOWN = "unknown"
@dataclass(frozen=True)
class ResourceClass:
"""Result of classifying a call by resource type."""
resource_type: ResourceType
is_write: bool
repository: str | None = None
org: str | None = None
owner: str | None = None
# Instance-wide, read-only utility prefixes: not owned by any user/org.
_MISC_GLOBAL_PREFIXES = frozenset(
{
"markdown",
"markup",
"version",
"gitignore",
"licenses",
"label",
"topics",
"nodeinfo",
"activitypub",
"miscellaneous",
"signing-key.gpg",
"settings",
}
)
# Token-owner-scoped prefixes ("me"/"my" endpoints).
_USER_SELF_PREFIXES = frozenset({"user", "notifications"})
def classify_raw_endpoint(method: str, endpoint: str) -> ResourceClass:
"""Classify a normalized raw ``/api/v1`` endpoint by resource type.
Args:
method: HTTP method (used only to set the read/write flag).
endpoint: A normalized ``/api/v1/...`` path.
Returns:
The resource classification; ``UNKNOWN`` when nothing matches (deny).
"""
is_write = raw_request_is_write(method, endpoint)
rel = raw_relative_segments(endpoint)
if not rel:
return ResourceClass(ResourceType.MISC_GLOBAL, is_write)
top = rel[0]
if top == "admin":
return ResourceClass(ResourceType.ADMIN, is_write)
if top in {"repos", "repositories"}:
repository = parse_raw_repository(endpoint)
# repository is None for cross-repo endpoints (search/issues) — those
# cannot be scoped to a single owner/repo and so fail closed downstream.
return ResourceClass(ResourceType.REPOSITORY, is_write, repository=repository)
if top in {"orgs", "org"}:
org = rel[1] if len(rel) >= 2 else None
return ResourceClass(ResourceType.ORG, is_write, org=org)
if top == "users":
owner = rel[1] if len(rel) >= 2 else None
return ResourceClass(ResourceType.USER_OWNED, is_write, owner=owner)
if top == "packages":
owner = rel[1] if len(rel) >= 2 else None
return ResourceClass(ResourceType.USER_OWNED, is_write, owner=owner)
if top in _USER_SELF_PREFIXES:
return ResourceClass(ResourceType.USER_SELF, is_write)
if top in _MISC_GLOBAL_PREFIXES:
return ResourceClass(ResourceType.MISC_GLOBAL, is_write)
return ResourceClass(ResourceType.UNKNOWN, is_write)
def classify_tool(tool_name: str, arguments: dict[str, object]) -> ResourceClass:
"""Classify a dispatched tool call (typed tool or ``gitea_request``).
Repository-scoped typed tools are handled by the server's repository check,
so this primarily classifies the non-repo surface that this module gates.
"""
if tool_name == "gitea_request":
method = str(arguments.get("method", "GET"))
path = str(arguments.get("path", ""))
try:
endpoint = normalize_raw_endpoint(path)
except ValueError:
return ResourceClass(ResourceType.UNKNOWN, is_write=True)
return classify_raw_endpoint(method, endpoint)
if tool_name == "list_org_repositories":
org = arguments.get("org")
return ResourceClass(
ResourceType.ORG, is_write=False, org=org if isinstance(org, str) else None
)
if tool_name == "list_organizations":
# Backed by /user/orgs: token-owner-scoped, not attributable to the caller
# in service-PAT mode.
return ResourceClass(ResourceType.USER_SELF, is_write=False)
# Any other non-repository tool is unrecognized for the purpose of this gate.
return ResourceClass(ResourceType.UNKNOWN, is_write=False)
# Bounded, short-TTL caches for positive verification results (fail-closed:
# only successful checks are cached).
_org_membership_cache: BoundedTTLCache[str, bool] | None = None
_site_admin_cache: BoundedTTLCache[str, bool] | None = None
def _get_org_membership_cache() -> BoundedTTLCache[str, bool]:
global _org_membership_cache
if _org_membership_cache is None:
ttl = get_settings().repo_authz_cache_ttl_seconds
_org_membership_cache = BoundedTTLCache(ttl_seconds=ttl, max_size=2048)
return _org_membership_cache
def _get_site_admin_cache() -> BoundedTTLCache[str, bool]:
global _site_admin_cache
if _site_admin_cache is None:
ttl = get_settings().repo_authz_cache_ttl_seconds
_site_admin_cache = BoundedTTLCache(ttl_seconds=ttl, max_size=2048)
return _site_admin_cache
def reset_authz_caches() -> None:
"""Reset authorization caches (primarily for tests)."""
global _org_membership_cache, _site_admin_cache
_org_membership_cache = None
_site_admin_cache = None
async def _service_get(path: str) -> httpx.Response | None:
"""GET ``path`` on Gitea with the service PAT; None on transport failure."""
settings = get_settings()
token = settings.gitea_token.strip()
if not token:
return None
url = f"{settings.gitea_base_url}{path}"
try:
async with httpx.AsyncClient(timeout=settings.request_timeout_seconds) as client:
return await client.get(
url,
headers={"Authorization": f"token {token}", "Accept": "application/json"},
)
except httpx.RequestError:
return None
async def verify_org_membership(*, org: str, user_login: str) -> bool:
"""Return True only if ``user_login`` is a verified member of ``org``.
Fails closed: any transport error, non-204 response, or missing identity
yields False.
"""
if not org or not user_login or user_login == "unknown":
return False
cache_key = f"{org.lower()}:{user_login.lower()}"
cache = _get_org_membership_cache()
if cache.get(cache_key) is True:
return True
encoded_org = urllib.parse.quote(org, safe="")
encoded_user = urllib.parse.quote(user_login, safe="")
response = await _service_get(f"/api/v1/orgs/{encoded_org}/members/{encoded_user}")
if response is not None and response.status_code == 204:
cache.set(cache_key, True)
return True
return False
async def verify_site_admin(*, user_login: str) -> bool:
"""Return True only if ``user_login`` is a verified Gitea site administrator.
Requires the service PAT to have admin visibility (so ``is_admin`` is
returned). Fails closed on any error or when the flag is not positively True.
"""
if not user_login or user_login == "unknown":
return False
cache_key = user_login.lower()
cache = _get_site_admin_cache()
if cache.get(cache_key) is True:
return True
encoded_user = urllib.parse.quote(user_login, safe="")
response = await _service_get(f"/api/v1/users/{encoded_user}")
if response is None or response.status_code != 200:
return False
try:
payload = response.json()
except ValueError:
return False
if isinstance(payload, dict) and payload.get("is_admin") is True:
cache.set(cache_key, True)
return True
return False
async def authorize_non_repository_access(
*,
classification: ResourceClass,
user_login: str,
tool_name: str,
correlation_id: str | None = None,
) -> None:
"""Enforce the resource-type rule for a non-repository call (service-PAT mode).
Raises:
ToolError: with status 403 when the call is denied. The repository type
is intentionally not handled here — the server's existing per-user
collaborator check owns it.
"""
audit = get_audit_logger()
settings = get_settings()
login = (user_login or "").strip()
def _deny(reason: str) -> ToolError:
audit.log_access_denied(
tool_name=tool_name,
repository=classification.repository,
reason=f"resource_authz:{classification.resource_type.value}:{reason}",
correlation_id=correlation_id,
)
return ToolError(
f"Access denied for {classification.resource_type.value} resource: {reason}",
status_code=403,
)
rtype = classification.resource_type
if rtype == ResourceType.REPOSITORY:
# Reached only when a repo-scoped path could not be parsed to owner/repo
# (e.g. cross-repo search). Cannot verify per-user permission -> deny.
raise _deny("repository could not be determined")
if rtype == ResourceType.ORG:
if not classification.org:
raise _deny("organization not specified")
if await verify_org_membership(org=classification.org, user_login=login):
return
raise _deny("user is not a verified member of the organization")
if rtype == ResourceType.USER_OWNED:
owner = (classification.owner or "").strip()
if not owner:
raise _deny("resource owner not specified")
if owner.lower() == login.lower() and login:
return
# The owner may be an organization the caller belongs to.
if await verify_org_membership(org=owner, user_login=login):
return
raise _deny("resource owner is neither the caller nor a member org")
if rtype == ResourceType.USER_SELF:
# Token-owner-scoped data; in service-PAT mode the token is the bot's, so
# the result cannot be attributed to the caller.
raise _deny("token-owner-scoped endpoint is not available in service-PAT mode")
if rtype == ResourceType.MISC_GLOBAL:
if not classification.is_write:
return
# Writes to global utility endpoints are not part of the safe surface.
raise _deny("write to a global endpoint is not permitted")
if rtype == ResourceType.ADMIN:
if not settings.raw_api_allow_sensitive:
raise _deny("admin surface is disabled (set RAW_API_ALLOW_SENSITIVE=true to opt in)")
if await verify_site_admin(user_login=login):
return
raise _deny("user is not a verified site administrator")
raise _deny("unclassified resource")