From e873d0325b0595cd6a55aec56e1c0ff93c19ee37 Mon Sep 17 00:00:00 2001 From: latte Date: Sun, 14 Jun 2026 17:07:19 +0200 Subject: [PATCH] feat: scope list_repositories to the authenticated user in service-PAT mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously list_repositories was blocked in service-PAT mode because it has no repository target for the per-user permission check, so users could not list their repositories at all (the connector surfaced a generic error). list_repositories now returns only the repositories the signed-in user owns or contributes to, instead of everything the bot token can see: - gitea_client.py: add list_user_repositories(login) — resolves the user id and queries /api/v1/repos/search with the uid filter. - repository.py: list_repositories_tool uses the user-scoped path when a service PAT is configured and a user login is present; pure-OAuth mode still uses the user's own /user/repos. - server.py: allow list_repositories through the service-PAT guard (it is scoped to the user in the handler); all other tools still require a repository target. - README.md: document the new user-scoped behavior and its visibility caveat. Tests: user-scoped client method (uid resolution + unknown user), PAT-mode tool scoping, and conftest now clears the request context between tests to prevent contextvar login leakage across files. Co-Authored-By: Claude Opus 4.8 --- README.md | 2 +- src/aegis_gitea_mcp/gitea_client.py | 51 +++++++++++++++++++++++++ src/aegis_gitea_mcp/server.py | 42 +++++++++++--------- src/aegis_gitea_mcp/tools/repository.py | 12 +++++- tests/conftest.py | 3 ++ tests/test_gitea_client.py | 33 ++++++++++++++++ tests/test_repository_tools.py | 23 +++++++++++ 7 files changed, 146 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 81bcc11..95fc2ab 100644 --- a/README.md +++ b/README.md @@ -74,7 +74,7 @@ GITEA_TOKEN= This does **not** weaken per-user security. OAuth remains authoritative: before every repository call the server verifies that the signed-in user has permission on the target repo through Gitea (`_verify_user_repository_access`) and denies it otherwise. The PAT only performs the API call after that check; OAuth provides identity, per-user authorization, and audit attribution. -Note: with a service PAT, `list_repositories` is intentionally blocked because it has no repository target to authorize per user — use the repository-scoped tools (`get_repository_info`, `get_file_contents`, `list_issues`, …) instead. +Note: with a service PAT, `list_repositories` is **scoped to the signed-in user** — it returns only the repositories that user owns or contributes to (resolved via Gitea's repo search with the `uid` filter), not everything the bot can see. Visibility of private repos still depends on what the service token itself can access. All other tools require an explicit `owner`/`repo` and run the per-user permission check first. ### 2a) Required writable volumes (read-only container) diff --git a/src/aegis_gitea_mcp/gitea_client.py b/src/aegis_gitea_mcp/gitea_client.py index 50677e6..4e8f5fe 100644 --- a/src/aegis_gitea_mcp/gitea_client.py +++ b/src/aegis_gitea_mcp/gitea_client.py @@ -173,6 +173,57 @@ class GiteaClient: ) raise + async def list_user_repositories(self, login: str) -> list[dict[str, Any]]: + """List repositories the given user owns or contributes to. + + Used in service-PAT mode so ``list_repositories`` returns repositories + scoped to the authenticated user instead of everything the service token + can see. Resolves the user id, then queries Gitea's repo search with the + ``uid`` filter. Visibility of private repos still depends on what the + service token itself can see. + """ + correlation_id = self.audit.log_tool_invocation( + tool_name="list_user_repositories", + result_status="pending", + ) + try: + user = await self._request( + "GET", + f"/api/v1/users/{quote(login, safe='')}", + correlation_id=correlation_id, + ) + uid = user.get("id") if isinstance(user, dict) else None + if not isinstance(uid, int): + return [] + + result = await self._request( + "GET", + "/api/v1/repos/search", + params={"uid": uid, "limit": 50, "page": 1}, + correlation_id=correlation_id, + ) + repositories: list[dict[str, Any]] = [] + if isinstance(result, dict): + data = result.get("data", []) + if isinstance(data, list): + repositories = [item for item in data if isinstance(item, dict)] + + self.audit.log_tool_invocation( + tool_name="list_user_repositories", + correlation_id=correlation_id, + result_status="success", + params={"login": login, "count": len(repositories)}, + ) + return repositories + except Exception as exc: + self.audit.log_tool_invocation( + tool_name="list_user_repositories", + correlation_id=correlation_id, + result_status="error", + error=str(exc), + ) + raise + async def get_repository(self, owner: str, repo: str) -> dict[str, Any]: """Get repository metadata.""" repo_id = f"{owner}/{repo}" diff --git a/src/aegis_gitea_mcp/server.py b/src/aegis_gitea_mcp/server.py index 0d26659..b5074d8 100644 --- a/src/aegis_gitea_mcp/server.py +++ b/src/aegis_gitea_mcp/server.py @@ -1165,26 +1165,32 @@ async def _execute_tool_call( if settings.gitea_token.strip(): if not repository: - audit.log_access_denied( - tool_name=tool_name, - reason="service_pat_requires_repository_target", + # list_repositories is not repo-scoped; the handler scopes it to + # the authenticated user's own repositories instead. Every other + # tool requires a repository target so per-user permission can be + # verified before the privileged service PAT is used. + if tool_name != "list_repositories": + audit.log_access_denied( + tool_name=tool_name, + reason="service_pat_requires_repository_target", + correlation_id=correlation_id, + ) + raise HTTPException( + status_code=403, + detail=( + "Service PAT mode requires a repository target so per-user " + "permission can be verified." + ), + ) + else: + user_login = get_gitea_user_login() + await _verify_user_repository_access( + repository=repository, + required_scope=required_scope, + user_login=user_login or "", correlation_id=correlation_id, + tool_name=tool_name, ) - raise HTTPException( - status_code=403, - detail=( - "Service PAT mode requires a repository target so per-user " - "permission can be verified." - ), - ) - user_login = get_gitea_user_login() - await _verify_user_repository_access( - repository=repository, - required_scope=required_scope, - user_login=user_login or "", - correlation_id=correlation_id, - tool_name=tool_name, - ) # In OAuth mode, Gitea OIDC access_tokens can't call the Gitea REST API # (they only carry OIDC scopes). If a service PAT is configured via diff --git a/src/aegis_gitea_mcp/tools/repository.py b/src/aegis_gitea_mcp/tools/repository.py index 101f3da..f160fd8 100644 --- a/src/aegis_gitea_mcp/tools/repository.py +++ b/src/aegis_gitea_mcp/tools/repository.py @@ -6,12 +6,14 @@ import base64 import binascii from typing import Any +from aegis_gitea_mcp.config import get_settings from aegis_gitea_mcp.gitea_client import ( GiteaAuthenticationError, GiteaAuthorizationError, GiteaClient, GiteaError, ) +from aegis_gitea_mcp.request_context import get_gitea_user_login from aegis_gitea_mcp.response_limits import limit_items, limit_text from aegis_gitea_mcp.security import sanitize_untrusted_text from aegis_gitea_mcp.tools.arguments import ( @@ -33,8 +35,16 @@ async def list_repositories_tool(gitea: GiteaClient, arguments: dict[str, Any]) Response payload with bounded repository list. """ ListRepositoriesArgs.model_validate(arguments) + settings = get_settings() + login = get_gitea_user_login() try: - repositories = await gitea.list_repositories() + # In service-PAT mode the API token is the bot's, so scope the listing to + # the authenticated user's own repositories. In pure-OAuth mode the API + # token already belongs to the user, so Gitea scopes /user/repos for us. + if settings.gitea_token.strip() and login and login != "unknown": + repositories = await gitea.list_user_repositories(login) + else: + repositories = await gitea.list_repositories() simplified = [ { "owner": repo.get("owner", {}).get("login", ""), diff --git a/tests/conftest.py b/tests/conftest.py index d6962d3..00d6374 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -13,6 +13,7 @@ from aegis_gitea_mcp.oauth_flow import reset_oauth_client_registry from aegis_gitea_mcp.observability import reset_metrics_registry from aegis_gitea_mcp.policy import reset_policy_engine from aegis_gitea_mcp.rate_limit import reset_rate_limiter +from aegis_gitea_mcp.request_context import clear_gitea_auth_context from aegis_gitea_mcp.server import reset_repo_authz_cache @@ -29,6 +30,7 @@ def reset_globals(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Generator[ reset_policy_engine() reset_rate_limiter() reset_metrics_registry() + clear_gitea_auth_context() # Use temporary directory for audit logs in tests audit_log_path = tmp_path / "audit.log" @@ -46,6 +48,7 @@ def reset_globals(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Generator[ reset_policy_engine() reset_rate_limiter() reset_metrics_registry() + clear_gitea_auth_context() @pytest.fixture diff --git a/tests/test_gitea_client.py b/tests/test_gitea_client.py index e2fddd9..02bc9af 100644 --- a/tests/test_gitea_client.py +++ b/tests/test_gitea_client.py @@ -213,6 +213,39 @@ def test_compare_refs_args_reject_traversal_head(value: str) -> None: CompareRefsArgs(owner="o", repo="r", base="main", head=value) +@pytest.mark.asyncio +async def test_list_user_repositories_scopes_by_uid() -> None: + """User-scoped listing resolves the uid and filters repo search by it.""" + client = GiteaClient(token="service-pat") + captured: dict = {} + + async def fake_request(method: str, endpoint: str, **kwargs): + if endpoint == "/api/v1/users/alice": + return {"id": 7, "login": "alice"} + if endpoint == "/api/v1/repos/search": + captured["params"] = kwargs.get("params") + return {"ok": True, "data": [{"full_name": "alice/demo"}, "not-a-dict"]} + return {} + + client._request = AsyncMock(side_effect=fake_request) # type: ignore[method-assign] + + repos = await client.list_user_repositories("alice") + assert captured["params"]["uid"] == 7 + assert repos == [{"full_name": "alice/demo"}] + + +@pytest.mark.asyncio +async def test_list_user_repositories_unknown_user_returns_empty() -> None: + """A user that cannot be resolved yields an empty list, not an error.""" + client = GiteaClient(token="service-pat") + + async def fake_request(method: str, endpoint: str, **kwargs): + return {} # no id field + + client._request = AsyncMock(side_effect=fake_request) # type: ignore[method-assign] + assert await client.list_user_repositories("ghost") == [] + + def test_git_refs_allow_slash_containing_refs() -> None: """Legitimate refs that contain '/' validate successfully.""" tree = FileTreeArgs(owner="o", repo="r", ref="feature/foo") diff --git a/tests/test_repository_tools.py b/tests/test_repository_tools.py index 651fedb..d1d0a90 100644 --- a/tests/test_repository_tools.py +++ b/tests/test_repository_tools.py @@ -10,6 +10,7 @@ from aegis_gitea_mcp.gitea_client import ( GiteaAuthorizationError, GiteaError, ) +from aegis_gitea_mcp.request_context import clear_gitea_auth_context, set_gitea_user_login from aegis_gitea_mcp.tools.repository import ( get_file_contents_tool, get_file_tree_tool, @@ -91,6 +92,28 @@ async def test_tool_propagates_auth_errors_unwrapped(auth_error: type[GiteaError await list_repositories_tool(AuthErrorStub(), {}) +class UserScopedStub(RepoStub): + """Stub exposing the per-user listing path used in service-PAT mode.""" + + async def list_user_repositories(self, login): + return [{"name": "mine", "owner": {"login": login}, "full_name": f"{login}/mine"}] + + async def list_repositories(self): + raise AssertionError("PAT mode with a known user must use the user-scoped listing") + + +@pytest.mark.asyncio +async def test_list_repositories_tool_scopes_to_user_in_pat_mode() -> None: + """With a service PAT (GITEA_TOKEN) and a known user, listing is user-scoped.""" + set_gitea_user_login("alice") + try: + result = await list_repositories_tool(UserScopedStub(), {}) + finally: + clear_gitea_auth_context() + assert result["count"] == 1 + assert result["repositories"][0]["full_name"] == "alice/mine" + + @pytest.mark.asyncio async def test_get_repository_info_tool_success() -> None: """Repository info tool returns normalized metadata."""