feat: scope list_repositories to the authenticated user in service-PAT mode
Previously list_repositories was blocked in service-PAT mode because it has no repository target for the per-user permission check, so users could not list their repositories at all (the connector surfaced a generic error). list_repositories now returns only the repositories the signed-in user owns or contributes to, instead of everything the bot token can see: - gitea_client.py: add list_user_repositories(login) — resolves the user id and queries /api/v1/repos/search with the uid filter. - repository.py: list_repositories_tool uses the user-scoped path when a service PAT is configured and a user login is present; pure-OAuth mode still uses the user's own /user/repos. - server.py: allow list_repositories through the service-PAT guard (it is scoped to the user in the handler); all other tools still require a repository target. - README.md: document the new user-scoped behavior and its visibility caveat. Tests: user-scoped client method (uid resolution + unknown user), PAT-mode tool scoping, and conftest now clears the request context between tests to prevent contextvar login leakage across files. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -74,7 +74,7 @@ GITEA_TOKEN=<bot-personal-access-token>
|
||||
|
||||
This does **not** weaken per-user security. OAuth remains authoritative: before every repository call the server verifies that the signed-in user has permission on the target repo through Gitea (`_verify_user_repository_access`) and denies it otherwise. The PAT only performs the API call after that check; OAuth provides identity, per-user authorization, and audit attribution.
|
||||
|
||||
Note: with a service PAT, `list_repositories` is intentionally blocked because it has no repository target to authorize per user — use the repository-scoped tools (`get_repository_info`, `get_file_contents`, `list_issues`, …) instead.
|
||||
Note: with a service PAT, `list_repositories` is **scoped to the signed-in user** — it returns only the repositories that user owns or contributes to (resolved via Gitea's repo search with the `uid` filter), not everything the bot can see. Visibility of private repos still depends on what the service token itself can access. All other tools require an explicit `owner`/`repo` and run the per-user permission check first.
|
||||
|
||||
### 2a) Required writable volumes (read-only container)
|
||||
|
||||
|
||||
@@ -173,6 +173,57 @@ class GiteaClient:
|
||||
)
|
||||
raise
|
||||
|
||||
async def list_user_repositories(self, login: str) -> list[dict[str, Any]]:
|
||||
"""List repositories the given user owns or contributes to.
|
||||
|
||||
Used in service-PAT mode so ``list_repositories`` returns repositories
|
||||
scoped to the authenticated user instead of everything the service token
|
||||
can see. Resolves the user id, then queries Gitea's repo search with the
|
||||
``uid`` filter. Visibility of private repos still depends on what the
|
||||
service token itself can see.
|
||||
"""
|
||||
correlation_id = self.audit.log_tool_invocation(
|
||||
tool_name="list_user_repositories",
|
||||
result_status="pending",
|
||||
)
|
||||
try:
|
||||
user = await self._request(
|
||||
"GET",
|
||||
f"/api/v1/users/{quote(login, safe='')}",
|
||||
correlation_id=correlation_id,
|
||||
)
|
||||
uid = user.get("id") if isinstance(user, dict) else None
|
||||
if not isinstance(uid, int):
|
||||
return []
|
||||
|
||||
result = await self._request(
|
||||
"GET",
|
||||
"/api/v1/repos/search",
|
||||
params={"uid": uid, "limit": 50, "page": 1},
|
||||
correlation_id=correlation_id,
|
||||
)
|
||||
repositories: list[dict[str, Any]] = []
|
||||
if isinstance(result, dict):
|
||||
data = result.get("data", [])
|
||||
if isinstance(data, list):
|
||||
repositories = [item for item in data if isinstance(item, dict)]
|
||||
|
||||
self.audit.log_tool_invocation(
|
||||
tool_name="list_user_repositories",
|
||||
correlation_id=correlation_id,
|
||||
result_status="success",
|
||||
params={"login": login, "count": len(repositories)},
|
||||
)
|
||||
return repositories
|
||||
except Exception as exc:
|
||||
self.audit.log_tool_invocation(
|
||||
tool_name="list_user_repositories",
|
||||
correlation_id=correlation_id,
|
||||
result_status="error",
|
||||
error=str(exc),
|
||||
)
|
||||
raise
|
||||
|
||||
async def get_repository(self, owner: str, repo: str) -> dict[str, Any]:
|
||||
"""Get repository metadata."""
|
||||
repo_id = f"{owner}/{repo}"
|
||||
|
||||
@@ -1165,26 +1165,32 @@ async def _execute_tool_call(
|
||||
|
||||
if settings.gitea_token.strip():
|
||||
if not repository:
|
||||
audit.log_access_denied(
|
||||
tool_name=tool_name,
|
||||
reason="service_pat_requires_repository_target",
|
||||
# list_repositories is not repo-scoped; the handler scopes it to
|
||||
# the authenticated user's own repositories instead. Every other
|
||||
# tool requires a repository target so per-user permission can be
|
||||
# verified before the privileged service PAT is used.
|
||||
if tool_name != "list_repositories":
|
||||
audit.log_access_denied(
|
||||
tool_name=tool_name,
|
||||
reason="service_pat_requires_repository_target",
|
||||
correlation_id=correlation_id,
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=403,
|
||||
detail=(
|
||||
"Service PAT mode requires a repository target so per-user "
|
||||
"permission can be verified."
|
||||
),
|
||||
)
|
||||
else:
|
||||
user_login = get_gitea_user_login()
|
||||
await _verify_user_repository_access(
|
||||
repository=repository,
|
||||
required_scope=required_scope,
|
||||
user_login=user_login or "",
|
||||
correlation_id=correlation_id,
|
||||
tool_name=tool_name,
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=403,
|
||||
detail=(
|
||||
"Service PAT mode requires a repository target so per-user "
|
||||
"permission can be verified."
|
||||
),
|
||||
)
|
||||
user_login = get_gitea_user_login()
|
||||
await _verify_user_repository_access(
|
||||
repository=repository,
|
||||
required_scope=required_scope,
|
||||
user_login=user_login or "",
|
||||
correlation_id=correlation_id,
|
||||
tool_name=tool_name,
|
||||
)
|
||||
|
||||
# In OAuth mode, Gitea OIDC access_tokens can't call the Gitea REST API
|
||||
# (they only carry OIDC scopes). If a service PAT is configured via
|
||||
|
||||
@@ -6,12 +6,14 @@ import base64
|
||||
import binascii
|
||||
from typing import Any
|
||||
|
||||
from aegis_gitea_mcp.config import get_settings
|
||||
from aegis_gitea_mcp.gitea_client import (
|
||||
GiteaAuthenticationError,
|
||||
GiteaAuthorizationError,
|
||||
GiteaClient,
|
||||
GiteaError,
|
||||
)
|
||||
from aegis_gitea_mcp.request_context import get_gitea_user_login
|
||||
from aegis_gitea_mcp.response_limits import limit_items, limit_text
|
||||
from aegis_gitea_mcp.security import sanitize_untrusted_text
|
||||
from aegis_gitea_mcp.tools.arguments import (
|
||||
@@ -33,8 +35,16 @@ async def list_repositories_tool(gitea: GiteaClient, arguments: dict[str, Any])
|
||||
Response payload with bounded repository list.
|
||||
"""
|
||||
ListRepositoriesArgs.model_validate(arguments)
|
||||
settings = get_settings()
|
||||
login = get_gitea_user_login()
|
||||
try:
|
||||
repositories = await gitea.list_repositories()
|
||||
# In service-PAT mode the API token is the bot's, so scope the listing to
|
||||
# the authenticated user's own repositories. In pure-OAuth mode the API
|
||||
# token already belongs to the user, so Gitea scopes /user/repos for us.
|
||||
if settings.gitea_token.strip() and login and login != "unknown":
|
||||
repositories = await gitea.list_user_repositories(login)
|
||||
else:
|
||||
repositories = await gitea.list_repositories()
|
||||
simplified = [
|
||||
{
|
||||
"owner": repo.get("owner", {}).get("login", ""),
|
||||
|
||||
@@ -13,6 +13,7 @@ from aegis_gitea_mcp.oauth_flow import reset_oauth_client_registry
|
||||
from aegis_gitea_mcp.observability import reset_metrics_registry
|
||||
from aegis_gitea_mcp.policy import reset_policy_engine
|
||||
from aegis_gitea_mcp.rate_limit import reset_rate_limiter
|
||||
from aegis_gitea_mcp.request_context import clear_gitea_auth_context
|
||||
from aegis_gitea_mcp.server import reset_repo_authz_cache
|
||||
|
||||
|
||||
@@ -29,6 +30,7 @@ def reset_globals(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Generator[
|
||||
reset_policy_engine()
|
||||
reset_rate_limiter()
|
||||
reset_metrics_registry()
|
||||
clear_gitea_auth_context()
|
||||
|
||||
# Use temporary directory for audit logs in tests
|
||||
audit_log_path = tmp_path / "audit.log"
|
||||
@@ -46,6 +48,7 @@ def reset_globals(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Generator[
|
||||
reset_policy_engine()
|
||||
reset_rate_limiter()
|
||||
reset_metrics_registry()
|
||||
clear_gitea_auth_context()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
||||
@@ -213,6 +213,39 @@ def test_compare_refs_args_reject_traversal_head(value: str) -> None:
|
||||
CompareRefsArgs(owner="o", repo="r", base="main", head=value)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_user_repositories_scopes_by_uid() -> None:
|
||||
"""User-scoped listing resolves the uid and filters repo search by it."""
|
||||
client = GiteaClient(token="service-pat")
|
||||
captured: dict = {}
|
||||
|
||||
async def fake_request(method: str, endpoint: str, **kwargs):
|
||||
if endpoint == "/api/v1/users/alice":
|
||||
return {"id": 7, "login": "alice"}
|
||||
if endpoint == "/api/v1/repos/search":
|
||||
captured["params"] = kwargs.get("params")
|
||||
return {"ok": True, "data": [{"full_name": "alice/demo"}, "not-a-dict"]}
|
||||
return {}
|
||||
|
||||
client._request = AsyncMock(side_effect=fake_request) # type: ignore[method-assign]
|
||||
|
||||
repos = await client.list_user_repositories("alice")
|
||||
assert captured["params"]["uid"] == 7
|
||||
assert repos == [{"full_name": "alice/demo"}]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_user_repositories_unknown_user_returns_empty() -> None:
|
||||
"""A user that cannot be resolved yields an empty list, not an error."""
|
||||
client = GiteaClient(token="service-pat")
|
||||
|
||||
async def fake_request(method: str, endpoint: str, **kwargs):
|
||||
return {} # no id field
|
||||
|
||||
client._request = AsyncMock(side_effect=fake_request) # type: ignore[method-assign]
|
||||
assert await client.list_user_repositories("ghost") == []
|
||||
|
||||
|
||||
def test_git_refs_allow_slash_containing_refs() -> None:
|
||||
"""Legitimate refs that contain '/' validate successfully."""
|
||||
tree = FileTreeArgs(owner="o", repo="r", ref="feature/foo")
|
||||
|
||||
@@ -10,6 +10,7 @@ from aegis_gitea_mcp.gitea_client import (
|
||||
GiteaAuthorizationError,
|
||||
GiteaError,
|
||||
)
|
||||
from aegis_gitea_mcp.request_context import clear_gitea_auth_context, set_gitea_user_login
|
||||
from aegis_gitea_mcp.tools.repository import (
|
||||
get_file_contents_tool,
|
||||
get_file_tree_tool,
|
||||
@@ -91,6 +92,28 @@ async def test_tool_propagates_auth_errors_unwrapped(auth_error: type[GiteaError
|
||||
await list_repositories_tool(AuthErrorStub(), {})
|
||||
|
||||
|
||||
class UserScopedStub(RepoStub):
|
||||
"""Stub exposing the per-user listing path used in service-PAT mode."""
|
||||
|
||||
async def list_user_repositories(self, login):
|
||||
return [{"name": "mine", "owner": {"login": login}, "full_name": f"{login}/mine"}]
|
||||
|
||||
async def list_repositories(self):
|
||||
raise AssertionError("PAT mode with a known user must use the user-scoped listing")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_repositories_tool_scopes_to_user_in_pat_mode() -> None:
|
||||
"""With a service PAT (GITEA_TOKEN) and a known user, listing is user-scoped."""
|
||||
set_gitea_user_login("alice")
|
||||
try:
|
||||
result = await list_repositories_tool(UserScopedStub(), {})
|
||||
finally:
|
||||
clear_gitea_auth_context()
|
||||
assert result["count"] == 1
|
||||
assert result["repositories"][0]["full_name"] == "alice/mine"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_repository_info_tool_success() -> None:
|
||||
"""Repository info tool returns normalized metadata."""
|
||||
|
||||
Reference in New Issue
Block a user