f0db219ee8
Adds a create_label write-mode tool so labels can be created in a repository
through the MCP server (previously there was no way to define labels, which
blocked attaching labels to issues). Follows the full tool checklist:
- arguments.py: CreateLabelArgs (name, hex color, optional description/exclusive),
with extra=forbid and a hex-color pattern.
- gitea_client.py: create_label() POSTing to /repos/{owner}/{repo}/labels with
url-encoded path segments.
- write_tools.py: create_label_tool handler; normalizes the color to a leading
'#', bounds text output, and lets auth/authz errors surface.
- mcp_protocol.py: register create_label (write_operation=True).
- server.py: wire create_label into TOOL_HANDLERS.
- docs/api-reference.md: document create_label.
- tests: success path, color normalization, and invalid-color rejection.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
733 lines
26 KiB
Python
733 lines
26 KiB
Python
"""Gitea API client with hardened request handling."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
from urllib.parse import quote
|
|
|
|
from httpx import AsyncClient, Response
|
|
|
|
from aegis_gitea_mcp.audit import get_audit_logger
|
|
from aegis_gitea_mcp.config import get_settings
|
|
|
|
|
|
class GiteaError(Exception):
|
|
"""Base exception for Gitea API errors."""
|
|
|
|
|
|
class GiteaAuthenticationError(GiteaError):
|
|
"""Raised when authentication with Gitea fails."""
|
|
|
|
|
|
class GiteaAuthorizationError(GiteaError):
|
|
"""Raised when the authenticated user lacks permission for an operation."""
|
|
|
|
|
|
class GiteaNotFoundError(GiteaError):
|
|
"""Raised when requested resource is not found."""
|
|
|
|
|
|
class GiteaClient:
|
|
"""Client for interacting with Gitea API as the authenticated end-user."""
|
|
|
|
def __init__(self, token: str, base_url: str | None = None) -> None:
|
|
"""Initialize Gitea client.
|
|
|
|
Args:
|
|
token: OAuth access token for the authenticated user.
|
|
base_url: Optional base URL override.
|
|
"""
|
|
self.settings = get_settings()
|
|
self.audit = get_audit_logger()
|
|
self.base_url = (base_url or self.settings.gitea_base_url).rstrip("/")
|
|
self.token = token.strip()
|
|
if not self.token:
|
|
raise ValueError("GiteaClient requires a non-empty per-user OAuth token")
|
|
self.client: AsyncClient | None = None
|
|
|
|
async def __aenter__(self) -> GiteaClient:
|
|
"""Create async HTTP client context."""
|
|
self.client = AsyncClient(
|
|
base_url=self.base_url,
|
|
headers={
|
|
"Authorization": f"Bearer {self.token}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
timeout=self.settings.request_timeout_seconds,
|
|
follow_redirects=True,
|
|
)
|
|
return self
|
|
|
|
async def __aexit__(self, *args: Any) -> None:
|
|
"""Close async HTTP client context."""
|
|
if self.client:
|
|
await self.client.aclose()
|
|
|
|
def _ensure_client(self) -> AsyncClient:
|
|
"""Return initialized HTTP client.
|
|
|
|
Raises:
|
|
RuntimeError: If called outside async context manager.
|
|
"""
|
|
if not self.client:
|
|
raise RuntimeError("Client not initialized - use async context manager")
|
|
return self.client
|
|
|
|
def _handle_response(self, response: Response, correlation_id: str) -> Any:
|
|
"""Handle HTTP response and map to domain exceptions."""
|
|
if response.status_code == 401:
|
|
self.audit.log_security_event(
|
|
event_type="authentication_failure",
|
|
description="Gitea API returned 401 Unauthorized",
|
|
severity="high",
|
|
metadata={"correlation_id": correlation_id},
|
|
)
|
|
raise GiteaAuthenticationError("Authentication failed - user token rejected")
|
|
|
|
if response.status_code == 403:
|
|
self.audit.log_access_denied(
|
|
tool_name="gitea_api",
|
|
reason="authenticated user lacks permission",
|
|
correlation_id=correlation_id,
|
|
)
|
|
raise GiteaAuthorizationError("Authenticated user lacks permission for this operation")
|
|
|
|
if response.status_code == 404:
|
|
raise GiteaNotFoundError("Resource not found")
|
|
|
|
if response.status_code >= 400:
|
|
error_msg = f"Gitea API error: {response.status_code}"
|
|
try:
|
|
error_data = response.json()
|
|
message = error_data.get("message") if isinstance(error_data, dict) else None
|
|
if message:
|
|
error_msg = f"{error_msg} - {message}"
|
|
except Exception:
|
|
pass
|
|
raise GiteaError(error_msg)
|
|
|
|
try:
|
|
return response.json()
|
|
except Exception:
|
|
return {}
|
|
|
|
async def _request(
|
|
self,
|
|
method: str,
|
|
endpoint: str,
|
|
*,
|
|
correlation_id: str,
|
|
params: dict[str, Any] | None = None,
|
|
json_body: dict[str, Any] | None = None,
|
|
) -> Any:
|
|
"""Execute a request to Gitea API with shared error handling."""
|
|
client = self._ensure_client()
|
|
response = await client.request(method=method, url=endpoint, params=params, json=json_body)
|
|
return self._handle_response(response, correlation_id)
|
|
|
|
async def get_current_user(self) -> dict[str, Any]:
|
|
"""Get current authenticated user profile."""
|
|
correlation_id = self.audit.log_tool_invocation(
|
|
tool_name="get_current_user",
|
|
result_status="pending",
|
|
)
|
|
try:
|
|
result = await self._request("GET", "/api/v1/user", correlation_id=correlation_id)
|
|
self.audit.log_tool_invocation(
|
|
tool_name="get_current_user",
|
|
correlation_id=correlation_id,
|
|
result_status="success",
|
|
)
|
|
return result if isinstance(result, dict) else {}
|
|
except Exception as exc:
|
|
self.audit.log_tool_invocation(
|
|
tool_name="get_current_user",
|
|
correlation_id=correlation_id,
|
|
result_status="error",
|
|
error=str(exc),
|
|
)
|
|
raise
|
|
|
|
async def list_repositories(self) -> list[dict[str, Any]]:
|
|
"""List repositories visible to the authenticated user."""
|
|
correlation_id = self.audit.log_tool_invocation(
|
|
tool_name="list_repositories",
|
|
result_status="pending",
|
|
)
|
|
try:
|
|
result = await self._request("GET", "/api/v1/user/repos", correlation_id=correlation_id)
|
|
repositories = result if isinstance(result, list) else []
|
|
self.audit.log_tool_invocation(
|
|
tool_name="list_repositories",
|
|
correlation_id=correlation_id,
|
|
result_status="success",
|
|
params={"count": len(repositories)},
|
|
)
|
|
return repositories
|
|
except Exception as exc:
|
|
self.audit.log_tool_invocation(
|
|
tool_name="list_repositories",
|
|
correlation_id=correlation_id,
|
|
result_status="error",
|
|
error=str(exc),
|
|
)
|
|
raise
|
|
|
|
async def list_user_repositories(self, login: str) -> list[dict[str, Any]]:
|
|
"""List repositories the given user owns or contributes to.
|
|
|
|
Used in service-PAT mode so ``list_repositories`` returns repositories
|
|
scoped to the authenticated user instead of everything the service token
|
|
can see. Resolves the user id, then queries Gitea's repo search with the
|
|
``uid`` filter. Visibility of private repos still depends on what the
|
|
service token itself can see.
|
|
"""
|
|
correlation_id = self.audit.log_tool_invocation(
|
|
tool_name="list_user_repositories",
|
|
result_status="pending",
|
|
)
|
|
try:
|
|
user = await self._request(
|
|
"GET",
|
|
f"/api/v1/users/{quote(login, safe='')}",
|
|
correlation_id=correlation_id,
|
|
)
|
|
uid = user.get("id") if isinstance(user, dict) else None
|
|
if not isinstance(uid, int):
|
|
return []
|
|
|
|
result = await self._request(
|
|
"GET",
|
|
"/api/v1/repos/search",
|
|
params={"uid": uid, "limit": 50, "page": 1},
|
|
correlation_id=correlation_id,
|
|
)
|
|
repositories: list[dict[str, Any]] = []
|
|
if isinstance(result, dict):
|
|
data = result.get("data", [])
|
|
if isinstance(data, list):
|
|
repositories = [item for item in data if isinstance(item, dict)]
|
|
|
|
self.audit.log_tool_invocation(
|
|
tool_name="list_user_repositories",
|
|
correlation_id=correlation_id,
|
|
result_status="success",
|
|
params={"login": login, "count": len(repositories)},
|
|
)
|
|
return repositories
|
|
except Exception as exc:
|
|
self.audit.log_tool_invocation(
|
|
tool_name="list_user_repositories",
|
|
correlation_id=correlation_id,
|
|
result_status="error",
|
|
error=str(exc),
|
|
)
|
|
raise
|
|
|
|
async def get_repository(self, owner: str, repo: str) -> dict[str, Any]:
|
|
"""Get repository metadata."""
|
|
repo_id = f"{owner}/{repo}"
|
|
enc_owner = quote(owner, safe="")
|
|
enc_repo = quote(repo, safe="")
|
|
correlation_id = self.audit.log_tool_invocation(
|
|
tool_name="get_repository",
|
|
repository=repo_id,
|
|
result_status="pending",
|
|
)
|
|
try:
|
|
result = await self._request(
|
|
"GET",
|
|
f"/api/v1/repos/{enc_owner}/{enc_repo}",
|
|
correlation_id=correlation_id,
|
|
)
|
|
self.audit.log_tool_invocation(
|
|
tool_name="get_repository",
|
|
repository=repo_id,
|
|
correlation_id=correlation_id,
|
|
result_status="success",
|
|
)
|
|
return result if isinstance(result, dict) else {}
|
|
except Exception as exc:
|
|
self.audit.log_tool_invocation(
|
|
tool_name="get_repository",
|
|
repository=repo_id,
|
|
correlation_id=correlation_id,
|
|
result_status="error",
|
|
error=str(exc),
|
|
)
|
|
raise
|
|
|
|
async def get_file_contents(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
filepath: str,
|
|
ref: str = "main",
|
|
) -> dict[str, Any]:
|
|
"""Get file contents from a repository."""
|
|
repo_id = f"{owner}/{repo}"
|
|
enc_owner = quote(owner, safe="")
|
|
enc_repo = quote(repo, safe="")
|
|
enc_filepath = quote(filepath, safe="/")
|
|
correlation_id = self.audit.log_tool_invocation(
|
|
tool_name="get_file_contents",
|
|
repository=repo_id,
|
|
target=filepath,
|
|
params={"ref": ref},
|
|
result_status="pending",
|
|
)
|
|
try:
|
|
result = await self._request(
|
|
"GET",
|
|
f"/api/v1/repos/{enc_owner}/{enc_repo}/contents/{enc_filepath}",
|
|
params={"ref": ref},
|
|
correlation_id=correlation_id,
|
|
)
|
|
|
|
if not isinstance(result, dict):
|
|
raise GiteaError("Unexpected response type for file contents")
|
|
|
|
file_size = int(result.get("size", 0))
|
|
if file_size > self.settings.max_file_size_bytes:
|
|
error_msg = (
|
|
f"File size ({file_size} bytes) exceeds limit "
|
|
f"({self.settings.max_file_size_bytes} bytes)"
|
|
)
|
|
self.audit.log_security_event(
|
|
event_type="file_size_limit_exceeded",
|
|
description=error_msg,
|
|
severity="low",
|
|
metadata={
|
|
"repository": repo_id,
|
|
"filepath": filepath,
|
|
"file_size": file_size,
|
|
"limit": self.settings.max_file_size_bytes,
|
|
},
|
|
)
|
|
raise GiteaError(error_msg)
|
|
|
|
self.audit.log_tool_invocation(
|
|
tool_name="get_file_contents",
|
|
repository=repo_id,
|
|
target=filepath,
|
|
correlation_id=correlation_id,
|
|
result_status="success",
|
|
params={"ref": ref, "size": file_size},
|
|
)
|
|
return result
|
|
except Exception as exc:
|
|
self.audit.log_tool_invocation(
|
|
tool_name="get_file_contents",
|
|
repository=repo_id,
|
|
target=filepath,
|
|
correlation_id=correlation_id,
|
|
result_status="error",
|
|
error=str(exc),
|
|
)
|
|
raise
|
|
|
|
async def get_tree(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
ref: str = "main",
|
|
recursive: bool = False,
|
|
) -> dict[str, Any]:
|
|
"""Get repository tree at given ref."""
|
|
repo_id = f"{owner}/{repo}"
|
|
enc_owner = quote(owner, safe="")
|
|
enc_repo = quote(repo, safe="")
|
|
enc_ref = quote(ref, safe="/")
|
|
correlation_id = self.audit.log_tool_invocation(
|
|
tool_name="get_tree",
|
|
repository=repo_id,
|
|
params={"ref": ref, "recursive": recursive},
|
|
result_status="pending",
|
|
)
|
|
try:
|
|
result = await self._request(
|
|
"GET",
|
|
f"/api/v1/repos/{enc_owner}/{enc_repo}/git/trees/{enc_ref}",
|
|
params={"recursive": str(recursive).lower()},
|
|
correlation_id=correlation_id,
|
|
)
|
|
tree_data = result if isinstance(result, dict) else {}
|
|
self.audit.log_tool_invocation(
|
|
tool_name="get_tree",
|
|
repository=repo_id,
|
|
correlation_id=correlation_id,
|
|
result_status="success",
|
|
params={
|
|
"ref": ref,
|
|
"recursive": recursive,
|
|
"count": len(tree_data.get("tree", [])),
|
|
},
|
|
)
|
|
return tree_data
|
|
except Exception as exc:
|
|
self.audit.log_tool_invocation(
|
|
tool_name="get_tree",
|
|
repository=repo_id,
|
|
correlation_id=correlation_id,
|
|
result_status="error",
|
|
error=str(exc),
|
|
)
|
|
raise
|
|
|
|
async def search_code(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
query: str,
|
|
*,
|
|
ref: str,
|
|
page: int,
|
|
limit: int,
|
|
) -> dict[str, Any]:
|
|
"""Search repository code by query."""
|
|
correlation_id = self.audit.log_tool_invocation(
|
|
tool_name="search_code",
|
|
repository=f"{owner}/{repo}",
|
|
params={"query": query, "ref": ref, "page": page, "limit": limit},
|
|
result_status="pending",
|
|
)
|
|
try:
|
|
result = await self._request(
|
|
"GET",
|
|
f"/api/v1/repos/{quote(owner, safe='')}/{quote(repo, safe='')}/search",
|
|
params={"q": query, "page": page, "limit": limit, "ref": ref},
|
|
correlation_id=correlation_id,
|
|
)
|
|
self.audit.log_tool_invocation(
|
|
tool_name="search_code",
|
|
repository=f"{owner}/{repo}",
|
|
correlation_id=correlation_id,
|
|
result_status="success",
|
|
)
|
|
return result if isinstance(result, dict) else {}
|
|
except Exception as exc:
|
|
self.audit.log_tool_invocation(
|
|
tool_name="search_code",
|
|
repository=f"{owner}/{repo}",
|
|
correlation_id=correlation_id,
|
|
result_status="error",
|
|
error=str(exc),
|
|
)
|
|
raise
|
|
|
|
async def list_commits(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
*,
|
|
ref: str,
|
|
page: int,
|
|
limit: int,
|
|
) -> list[dict[str, Any]]:
|
|
"""List commits for a repository ref."""
|
|
result = await self._request(
|
|
"GET",
|
|
f"/api/v1/repos/{quote(owner, safe='')}/{quote(repo, safe='')}/commits",
|
|
params={"sha": ref, "page": page, "limit": limit},
|
|
correlation_id=str(
|
|
self.audit.log_tool_invocation(tool_name="list_commits", result_status="pending")
|
|
),
|
|
)
|
|
return result if isinstance(result, list) else []
|
|
|
|
async def get_commit_diff(self, owner: str, repo: str, sha: str) -> dict[str, Any]:
|
|
"""Get detailed commit including changed files and patch metadata."""
|
|
enc_owner = quote(owner, safe="")
|
|
enc_repo = quote(repo, safe="")
|
|
enc_sha = quote(sha, safe="/")
|
|
result = await self._request(
|
|
"GET",
|
|
f"/api/v1/repos/{enc_owner}/{enc_repo}/git/commits/{enc_sha}",
|
|
correlation_id=str(
|
|
self.audit.log_tool_invocation(tool_name="get_commit_diff", result_status="pending")
|
|
),
|
|
)
|
|
return result if isinstance(result, dict) else {}
|
|
|
|
async def compare_refs(self, owner: str, repo: str, base: str, head: str) -> dict[str, Any]:
|
|
"""Compare two refs and return commit/file deltas."""
|
|
enc_owner = quote(owner, safe="")
|
|
enc_repo = quote(repo, safe="")
|
|
enc_base = quote(base, safe="/")
|
|
enc_head = quote(head, safe="/")
|
|
result = await self._request(
|
|
"GET",
|
|
f"/api/v1/repos/{enc_owner}/{enc_repo}/compare/{enc_base}...{enc_head}",
|
|
correlation_id=str(
|
|
self.audit.log_tool_invocation(tool_name="compare_refs", result_status="pending")
|
|
),
|
|
)
|
|
return result if isinstance(result, dict) else {}
|
|
|
|
async def list_issues(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
*,
|
|
state: str,
|
|
page: int,
|
|
limit: int,
|
|
labels: list[str] | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
"""List repository issues."""
|
|
params: dict[str, Any] = {"state": state, "page": page, "limit": limit}
|
|
if labels:
|
|
params["labels"] = ",".join(labels)
|
|
|
|
result = await self._request(
|
|
"GET",
|
|
f"/api/v1/repos/{quote(owner, safe='')}/{quote(repo, safe='')}/issues",
|
|
params=params,
|
|
correlation_id=str(
|
|
self.audit.log_tool_invocation(tool_name="list_issues", result_status="pending")
|
|
),
|
|
)
|
|
return result if isinstance(result, list) else []
|
|
|
|
async def get_issue(self, owner: str, repo: str, index: int) -> dict[str, Any]:
|
|
"""Get issue details."""
|
|
result = await self._request(
|
|
"GET",
|
|
f"/api/v1/repos/{quote(owner, safe='')}/{quote(repo, safe='')}/issues/{index}",
|
|
correlation_id=str(
|
|
self.audit.log_tool_invocation(tool_name="get_issue", result_status="pending")
|
|
),
|
|
)
|
|
return result if isinstance(result, dict) else {}
|
|
|
|
async def list_pull_requests(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
*,
|
|
state: str,
|
|
page: int,
|
|
limit: int,
|
|
) -> list[dict[str, Any]]:
|
|
"""List pull requests for repository."""
|
|
result = await self._request(
|
|
"GET",
|
|
f"/api/v1/repos/{quote(owner, safe='')}/{quote(repo, safe='')}/pulls",
|
|
params={"state": state, "page": page, "limit": limit},
|
|
correlation_id=str(
|
|
self.audit.log_tool_invocation(
|
|
tool_name="list_pull_requests", result_status="pending"
|
|
)
|
|
),
|
|
)
|
|
return result if isinstance(result, list) else []
|
|
|
|
async def get_pull_request(self, owner: str, repo: str, index: int) -> dict[str, Any]:
|
|
"""Get a single pull request."""
|
|
result = await self._request(
|
|
"GET",
|
|
f"/api/v1/repos/{quote(owner, safe='')}/{quote(repo, safe='')}/pulls/{index}",
|
|
correlation_id=str(
|
|
self.audit.log_tool_invocation(
|
|
tool_name="get_pull_request", result_status="pending"
|
|
)
|
|
),
|
|
)
|
|
return result if isinstance(result, dict) else {}
|
|
|
|
async def list_labels(
|
|
self, owner: str, repo: str, *, page: int, limit: int
|
|
) -> list[dict[str, Any]]:
|
|
"""List repository labels."""
|
|
result = await self._request(
|
|
"GET",
|
|
f"/api/v1/repos/{quote(owner, safe='')}/{quote(repo, safe='')}/labels",
|
|
params={"page": page, "limit": limit},
|
|
correlation_id=str(
|
|
self.audit.log_tool_invocation(tool_name="list_labels", result_status="pending")
|
|
),
|
|
)
|
|
return result if isinstance(result, list) else []
|
|
|
|
async def list_tags(
|
|
self, owner: str, repo: str, *, page: int, limit: int
|
|
) -> list[dict[str, Any]]:
|
|
"""List repository tags."""
|
|
result = await self._request(
|
|
"GET",
|
|
f"/api/v1/repos/{quote(owner, safe='')}/{quote(repo, safe='')}/tags",
|
|
params={"page": page, "limit": limit},
|
|
correlation_id=str(
|
|
self.audit.log_tool_invocation(tool_name="list_tags", result_status="pending")
|
|
),
|
|
)
|
|
return result if isinstance(result, list) else []
|
|
|
|
async def list_releases(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
*,
|
|
page: int,
|
|
limit: int,
|
|
) -> list[dict[str, Any]]:
|
|
"""List repository releases."""
|
|
result = await self._request(
|
|
"GET",
|
|
f"/api/v1/repos/{quote(owner, safe='')}/{quote(repo, safe='')}/releases",
|
|
params={"page": page, "limit": limit},
|
|
correlation_id=str(
|
|
self.audit.log_tool_invocation(tool_name="list_releases", result_status="pending")
|
|
),
|
|
)
|
|
return result if isinstance(result, list) else []
|
|
|
|
async def create_issue(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
*,
|
|
title: str,
|
|
body: str,
|
|
labels: list[str] | None = None,
|
|
assignees: list[str] | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Create repository issue."""
|
|
payload: dict[str, Any] = {"title": title, "body": body}
|
|
if labels:
|
|
payload["labels"] = labels
|
|
if assignees:
|
|
payload["assignees"] = assignees
|
|
result = await self._request(
|
|
"POST",
|
|
f"/api/v1/repos/{quote(owner, safe='')}/{quote(repo, safe='')}/issues",
|
|
json_body=payload,
|
|
correlation_id=str(
|
|
self.audit.log_tool_invocation(tool_name="create_issue", result_status="pending")
|
|
),
|
|
)
|
|
return result if isinstance(result, dict) else {}
|
|
|
|
async def update_issue(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
index: int,
|
|
*,
|
|
title: str | None = None,
|
|
body: str | None = None,
|
|
state: str | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Update issue fields."""
|
|
payload: dict[str, Any] = {}
|
|
if title is not None:
|
|
payload["title"] = title
|
|
if body is not None:
|
|
payload["body"] = body
|
|
if state is not None:
|
|
payload["state"] = state
|
|
result = await self._request(
|
|
"PATCH",
|
|
f"/api/v1/repos/{quote(owner, safe='')}/{quote(repo, safe='')}/issues/{index}",
|
|
json_body=payload,
|
|
correlation_id=str(
|
|
self.audit.log_tool_invocation(tool_name="update_issue", result_status="pending")
|
|
),
|
|
)
|
|
return result if isinstance(result, dict) else {}
|
|
|
|
async def create_issue_comment(
|
|
self, owner: str, repo: str, index: int, body: str
|
|
) -> dict[str, Any]:
|
|
"""Create a comment on issue (and PR discussion if issue index refers to PR)."""
|
|
result = await self._request(
|
|
"POST",
|
|
f"/api/v1/repos/{quote(owner, safe='')}/{quote(repo, safe='')}/issues/{index}/comments",
|
|
json_body={"body": body},
|
|
correlation_id=str(
|
|
self.audit.log_tool_invocation(
|
|
tool_name="create_issue_comment", result_status="pending"
|
|
)
|
|
),
|
|
)
|
|
return result if isinstance(result, dict) else {}
|
|
|
|
async def create_pr_comment(
|
|
self, owner: str, repo: str, index: int, body: str
|
|
) -> dict[str, Any]:
|
|
"""Create PR discussion comment."""
|
|
result = await self._request(
|
|
"POST",
|
|
f"/api/v1/repos/{quote(owner, safe='')}/{quote(repo, safe='')}/issues/{index}/comments",
|
|
json_body={"body": body},
|
|
correlation_id=str(
|
|
self.audit.log_tool_invocation(
|
|
tool_name="create_pr_comment", result_status="pending"
|
|
)
|
|
),
|
|
)
|
|
return result if isinstance(result, dict) else {}
|
|
|
|
async def create_label(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
*,
|
|
name: str,
|
|
color: str,
|
|
description: str = "",
|
|
exclusive: bool = False,
|
|
) -> dict[str, Any]:
|
|
"""Create a repository label."""
|
|
payload: dict[str, Any] = {
|
|
"name": name,
|
|
"color": color,
|
|
"description": description,
|
|
"exclusive": exclusive,
|
|
}
|
|
result = await self._request(
|
|
"POST",
|
|
f"/api/v1/repos/{quote(owner, safe='')}/{quote(repo, safe='')}/labels",
|
|
json_body=payload,
|
|
correlation_id=str(
|
|
self.audit.log_tool_invocation(tool_name="create_label", result_status="pending")
|
|
),
|
|
)
|
|
return result if isinstance(result, dict) else {}
|
|
|
|
async def add_labels(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
index: int,
|
|
labels: list[str],
|
|
) -> dict[str, Any]:
|
|
"""Add labels to issue/PR."""
|
|
result = await self._request(
|
|
"POST",
|
|
f"/api/v1/repos/{quote(owner, safe='')}/{quote(repo, safe='')}/issues/{index}/labels",
|
|
json_body={"labels": labels},
|
|
correlation_id=str(
|
|
self.audit.log_tool_invocation(tool_name="add_labels", result_status="pending")
|
|
),
|
|
)
|
|
return result if isinstance(result, dict) else {}
|
|
|
|
async def assign_issue(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
index: int,
|
|
assignees: list[str],
|
|
) -> dict[str, Any]:
|
|
"""Assign users to issue/PR."""
|
|
result = await self._request(
|
|
"POST",
|
|
f"/api/v1/repos/{quote(owner, safe='')}/{quote(repo, safe='')}/issues/{index}/assignees",
|
|
json_body={"assignees": assignees},
|
|
correlation_id=str(
|
|
self.audit.log_tool_invocation(tool_name="assign_issue", result_status="pending")
|
|
),
|
|
)
|
|
return result if isinstance(result, dict) else {}
|