Files
AegisGitea-MCP/src/aegis_gitea_mcp/tools/read_tools.py
T
Latte 41749fd7b4 fix: harden get_issue parsing and surface real errors (#27); align CI image publish
get_issue raised 'NoneType' object is not iterable on issues whose
labels/assignees Gitea returns as null or with non-dict elements (the #13
class), which reached clients as an opaque JSON-RPC -32603 with no detail.

- read_tools: skip non-dict label/assignee entries in get_issue_tool
- server: detect a wrapped GiteaNotFoundError via the __cause__ chain and
  return 404 / JSON-RPC -32000 with a clear message; include the exception
  type name in masked internal errors so future masked failures are
  diagnosable without exposing messages or stack traces
- tests: cover non-dict collection elements and the not-found / typed-error
  responses
- ci: rewrite docker.yml to build, smoke-test and push the image to the
  Gitea container registry on merge to main/dev, matching the hiddenden.cafe
  pattern (only REGISTRY_TOKEN required)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-25 16:51:58 +02:00

851 lines
32 KiB
Python

"""Extended read-only MCP tools."""
from __future__ import annotations
import logging
from typing import Any
from aegis_gitea_mcp.gitea_client import (
GiteaAuthenticationError,
GiteaAuthorizationError,
GiteaClient,
GiteaError,
)
from aegis_gitea_mcp.logging_utils import log_event, log_nullable_field
from aegis_gitea_mcp.response_limits import limit_items, limit_text
from aegis_gitea_mcp.tools.arguments import (
CommitDiffArgs,
CommitStatusArgs,
CompareRefsArgs,
GetBranchArgs,
GetReleaseArgs,
IssueArgs,
LatestReleaseArgs,
ListBranchesArgs,
ListCommitsArgs,
ListIssueCommentsArgs,
ListIssuesArgs,
ListLabelsArgs,
ListMilestonesArgs,
ListOrganizationsArgs,
ListOrgRepositoriesArgs,
ListPullRequestCommitsArgs,
ListPullRequestFilesArgs,
ListPullRequestsArgs,
ListReleasesArgs,
ListTagsArgs,
PullRequestArgs,
RepoLanguagesArgs,
RepoTopicsArgs,
SearchCodeArgs,
)
logger = logging.getLogger(__name__)
async def search_code_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
"""Search repository code and return bounded result snippets."""
parsed = SearchCodeArgs.model_validate(arguments)
try:
raw = await gitea.search_code(
parsed.owner,
parsed.repo,
parsed.query,
ref=parsed.ref,
page=parsed.page,
limit=parsed.limit,
)
hits_raw = raw.get("data", raw.get("hits", [])) if isinstance(raw, dict) else []
if not isinstance(hits_raw, list):
hits_raw = []
normalized_hits = []
for item in hits_raw:
if not isinstance(item, dict):
continue
snippet = str(item.get("content", item.get("snippet", "")))
normalized_hits.append(
{
"path": item.get("filename", item.get("path", "")),
"sha": item.get("sha", ""),
"ref": parsed.ref,
"snippet": limit_text(snippet),
"score": item.get("score", 0),
}
)
bounded, omitted = limit_items(normalized_hits, configured_limit=parsed.limit)
return {
"owner": parsed.owner,
"repo": parsed.repo,
"query": parsed.query,
"ref": parsed.ref,
"results": bounded,
"count": len(bounded),
"omitted": omitted,
}
except (GiteaAuthenticationError, GiteaAuthorizationError):
# Let auth/authz failures surface so the server returns actionable
# re-authorization guidance instead of a generic internal error.
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to search code: {exc}") from exc
async def list_commits_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
"""List commits for a repository reference."""
parsed = ListCommitsArgs.model_validate(arguments)
try:
commits = await gitea.list_commits(
parsed.owner,
parsed.repo,
ref=parsed.ref,
page=parsed.page,
limit=parsed.limit,
)
normalized = [
{
"sha": commit.get("sha", ""),
"message": limit_text(str(commit.get("commit", {}).get("message", ""))),
"author": commit.get("author", {}).get("login", ""),
"created": commit.get("commit", {}).get("author", {}).get("date", ""),
"url": commit.get("html_url", ""),
}
for commit in commits
if isinstance(commit, dict)
]
bounded, omitted = limit_items(normalized, configured_limit=parsed.limit)
return {
"owner": parsed.owner,
"repo": parsed.repo,
"ref": parsed.ref,
"commits": bounded,
"count": len(bounded),
"omitted": omitted,
}
except (GiteaAuthenticationError, GiteaAuthorizationError):
# Let auth/authz failures surface so the server returns actionable
# re-authorization guidance instead of a generic internal error.
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to list commits: {exc}") from exc
async def get_commit_diff_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
"""Return commit-level file diff metadata."""
parsed = CommitDiffArgs.model_validate(arguments)
try:
commit = await gitea.get_commit_diff(parsed.owner, parsed.repo, parsed.sha)
files = commit.get("files", []) if isinstance(commit, dict) else []
normalized_files = []
if isinstance(files, list):
for item in files:
if not isinstance(item, dict):
continue
normalized_files.append(
{
"filename": item.get("filename", ""),
"status": item.get("status", ""),
"additions": item.get("additions", 0),
"deletions": item.get("deletions", 0),
"changes": item.get("changes", 0),
"patch": limit_text(str(item.get("patch", ""))),
}
)
bounded, omitted = limit_items(normalized_files)
return {
"owner": parsed.owner,
"repo": parsed.repo,
"sha": parsed.sha,
"message": limit_text(
str(commit.get("message", commit.get("commit", {}).get("message", "")))
),
"files": bounded,
"count": len(bounded),
"omitted": omitted,
}
except (GiteaAuthenticationError, GiteaAuthorizationError):
# Let auth/authz failures surface so the server returns actionable
# re-authorization guidance instead of a generic internal error.
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to get commit diff: {exc}") from exc
async def compare_refs_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
"""Compare two refs and return bounded commit/file changes."""
parsed = CompareRefsArgs.model_validate(arguments)
try:
comparison = await gitea.compare_refs(parsed.owner, parsed.repo, parsed.base, parsed.head)
commits_raw = comparison.get("commits", []) if isinstance(comparison, dict) else []
files_raw = comparison.get("files", []) if isinstance(comparison, dict) else []
commits = [
{
"sha": commit.get("sha", ""),
"message": limit_text(str(commit.get("commit", {}).get("message", ""))),
}
for commit in commits_raw
if isinstance(commit, dict)
]
commit_items, commit_omitted = limit_items(commits)
files = [
{
"filename": item.get("filename", ""),
"status": item.get("status", ""),
"additions": item.get("additions", 0),
"deletions": item.get("deletions", 0),
}
for item in files_raw
if isinstance(item, dict)
]
file_items, file_omitted = limit_items(files)
return {
"owner": parsed.owner,
"repo": parsed.repo,
"base": parsed.base,
"head": parsed.head,
"commits": commit_items,
"files": file_items,
"commit_count": len(commit_items),
"file_count": len(file_items),
"omitted_commits": commit_omitted,
"omitted_files": file_omitted,
}
except (GiteaAuthenticationError, GiteaAuthorizationError):
# Let auth/authz failures surface so the server returns actionable
# re-authorization guidance instead of a generic internal error.
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to compare refs: {exc}") from exc
async def list_issues_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
"""List issues for repository."""
parsed = ListIssuesArgs.model_validate(arguments)
try:
issues = await gitea.list_issues(
parsed.owner,
parsed.repo,
state=parsed.state,
page=parsed.page,
limit=parsed.limit,
labels=parsed.labels,
)
normalized = [
{
"number": issue.get("number", 0),
"title": limit_text(str(issue.get("title", ""))),
"state": issue.get("state", ""),
"author": issue.get("user", {}).get("login", ""),
"labels": [label.get("name", "") for label in issue.get("labels", [])],
"created_at": issue.get("created_at", ""),
"updated_at": issue.get("updated_at", ""),
"url": issue.get("html_url", ""),
}
for issue in issues
if isinstance(issue, dict)
]
bounded, omitted = limit_items(normalized, configured_limit=parsed.limit)
return {
"owner": parsed.owner,
"repo": parsed.repo,
"state": parsed.state,
"issues": bounded,
"count": len(bounded),
"omitted": omitted,
}
except (GiteaAuthenticationError, GiteaAuthorizationError):
# Let auth/authz failures surface so the server returns actionable
# re-authorization guidance instead of a generic internal error.
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to list issues: {exc}") from exc
async def get_issue_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
"""Get issue details."""
parsed = IssueArgs.model_validate(arguments)
log_event(
logger,
logging.DEBUG,
"get_issue.start",
owner=parsed.owner,
repo=parsed.repo,
issue_number=parsed.issue_number,
)
try:
issue = await gitea.get_issue(parsed.owner, parsed.repo, parsed.issue_number)
log_event(
logger,
logging.DEBUG,
"get_issue.payload_shape",
top_level_keys=sorted(issue.keys()) if issue else None,
)
# Surface nullable collections that previously broke parsing (see #13).
log_nullable_field(logger, "get_issue.field_check", "labels", issue.get("labels"))
log_nullable_field(logger, "get_issue.field_check", "assignees", issue.get("assignees"))
log_nullable_field(logger, "get_issue.field_check", "user", issue.get("user"))
return {
"number": issue.get("number", 0),
"title": limit_text(str(issue.get("title", ""))),
"body": limit_text(str(issue.get("body", ""))),
"state": issue.get("state", ""),
"author": (issue.get("user") or {}).get("login", ""),
"labels": [
label.get("name", "")
for label in (issue.get("labels") or [])
if isinstance(label, dict)
],
"assignees": [
assignee.get("login", "")
for assignee in (issue.get("assignees") or [])
if isinstance(assignee, dict)
],
"created_at": issue.get("created_at", ""),
"updated_at": issue.get("updated_at", ""),
"url": issue.get("html_url", ""),
}
except (GiteaAuthenticationError, GiteaAuthorizationError):
# Let auth/authz failures surface so the server returns actionable
# re-authorization guidance instead of a generic internal error.
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to get issue: {exc}") from exc
async def list_pull_requests_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
"""List pull requests."""
parsed = ListPullRequestsArgs.model_validate(arguments)
try:
pull_requests = await gitea.list_pull_requests(
parsed.owner,
parsed.repo,
state=parsed.state,
page=parsed.page,
limit=parsed.limit,
)
normalized = [
{
"number": pull.get("number", 0),
"title": limit_text(str(pull.get("title", ""))),
"state": pull.get("state", ""),
"author": pull.get("user", {}).get("login", ""),
"draft": pull.get("draft", False),
"mergeable": pull.get("mergeable", False),
"created_at": pull.get("created_at", ""),
"updated_at": pull.get("updated_at", ""),
"url": pull.get("html_url", ""),
}
for pull in pull_requests
if isinstance(pull, dict)
]
bounded, omitted = limit_items(normalized, configured_limit=parsed.limit)
return {
"owner": parsed.owner,
"repo": parsed.repo,
"state": parsed.state,
"pull_requests": bounded,
"count": len(bounded),
"omitted": omitted,
}
except (GiteaAuthenticationError, GiteaAuthorizationError):
# Let auth/authz failures surface so the server returns actionable
# re-authorization guidance instead of a generic internal error.
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to list pull requests: {exc}") from exc
async def get_pull_request_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
"""Get pull request details."""
parsed = PullRequestArgs.model_validate(arguments)
try:
pull = await gitea.get_pull_request(parsed.owner, parsed.repo, parsed.pull_number)
return {
"number": pull.get("number", 0),
"title": limit_text(str(pull.get("title", ""))),
"body": limit_text(str(pull.get("body", ""))),
"state": pull.get("state", ""),
"draft": pull.get("draft", False),
"mergeable": pull.get("mergeable", False),
"author": pull.get("user", {}).get("login", ""),
"base": pull.get("base", {}).get("ref", ""),
"head": pull.get("head", {}).get("ref", ""),
"created_at": pull.get("created_at", ""),
"updated_at": pull.get("updated_at", ""),
"url": pull.get("html_url", ""),
}
except (GiteaAuthenticationError, GiteaAuthorizationError):
# Let auth/authz failures surface so the server returns actionable
# re-authorization guidance instead of a generic internal error.
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to get pull request: {exc}") from exc
async def list_labels_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
"""List labels configured on repository."""
parsed = ListLabelsArgs.model_validate(arguments)
try:
labels = await gitea.list_labels(
parsed.owner, parsed.repo, page=parsed.page, limit=parsed.limit
)
normalized = [
{
"id": label.get("id", 0),
"name": label.get("name", ""),
"color": label.get("color", ""),
"description": limit_text(str(label.get("description", ""))),
}
for label in labels
if isinstance(label, dict)
]
bounded, omitted = limit_items(normalized, configured_limit=parsed.limit)
return {
"owner": parsed.owner,
"repo": parsed.repo,
"labels": bounded,
"count": len(bounded),
"omitted": omitted,
}
except (GiteaAuthenticationError, GiteaAuthorizationError):
# Let auth/authz failures surface so the server returns actionable
# re-authorization guidance instead of a generic internal error.
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to list labels: {exc}") from exc
async def list_tags_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
"""List repository tags."""
parsed = ListTagsArgs.model_validate(arguments)
try:
tags = await gitea.list_tags(
parsed.owner, parsed.repo, page=parsed.page, limit=parsed.limit
)
normalized = [
{
"name": tag.get("name", ""),
"commit": tag.get("commit", {}).get("sha", ""),
"zipball_url": tag.get("zipball_url", ""),
"tarball_url": tag.get("tarball_url", ""),
}
for tag in tags
if isinstance(tag, dict)
]
bounded, omitted = limit_items(normalized, configured_limit=parsed.limit)
return {
"owner": parsed.owner,
"repo": parsed.repo,
"tags": bounded,
"count": len(bounded),
"omitted": omitted,
}
except (GiteaAuthenticationError, GiteaAuthorizationError):
# Let auth/authz failures surface so the server returns actionable
# re-authorization guidance instead of a generic internal error.
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to list tags: {exc}") from exc
async def list_releases_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
"""List repository releases."""
parsed = ListReleasesArgs.model_validate(arguments)
try:
releases = await gitea.list_releases(
parsed.owner,
parsed.repo,
page=parsed.page,
limit=parsed.limit,
)
normalized = [
{
"id": release.get("id", 0),
"tag_name": release.get("tag_name", ""),
"name": limit_text(str(release.get("name", ""))),
"draft": release.get("draft", False),
"prerelease": release.get("prerelease", False),
"body": limit_text(str(release.get("body", ""))),
"created_at": release.get("created_at", ""),
"published_at": release.get("published_at", ""),
"url": release.get("html_url", ""),
}
for release in releases
if isinstance(release, dict)
]
bounded, omitted = limit_items(normalized, configured_limit=parsed.limit)
return {
"owner": parsed.owner,
"repo": parsed.repo,
"releases": bounded,
"count": len(bounded),
"omitted": omitted,
}
except (GiteaAuthenticationError, GiteaAuthorizationError):
# Let auth/authz failures surface so the server returns actionable
# re-authorization guidance instead of a generic internal error.
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to list releases: {exc}") from exc
async def list_pull_request_files_tool(
gitea: GiteaClient, arguments: dict[str, Any]
) -> dict[str, Any]:
"""List files changed in a pull request."""
parsed = ListPullRequestFilesArgs.model_validate(arguments)
try:
files = await gitea.list_pull_request_files(
parsed.owner, parsed.repo, parsed.pull_number, page=parsed.page, limit=parsed.limit
)
normalized = [
{
"filename": item.get("filename", ""),
"status": item.get("status", ""),
"additions": item.get("additions", 0),
"deletions": item.get("deletions", 0),
"changes": item.get("changes", 0),
}
for item in files
if isinstance(item, dict)
]
bounded, omitted = limit_items(normalized, configured_limit=parsed.limit)
return {
"owner": parsed.owner,
"repo": parsed.repo,
"pull_number": parsed.pull_number,
"files": bounded,
"count": len(bounded),
"omitted": omitted,
}
except (GiteaAuthenticationError, GiteaAuthorizationError):
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to list pull request files: {exc}") from exc
async def list_pull_request_commits_tool(
gitea: GiteaClient, arguments: dict[str, Any]
) -> dict[str, Any]:
"""List commits in a pull request."""
parsed = ListPullRequestCommitsArgs.model_validate(arguments)
try:
commits = await gitea.list_pull_request_commits(
parsed.owner, parsed.repo, parsed.pull_number, page=parsed.page, limit=parsed.limit
)
normalized = [
{
"sha": commit.get("sha", ""),
"message": limit_text(str(commit.get("commit", {}).get("message", ""))),
"author": (
commit.get("author", {}).get("login", "")
if isinstance(commit.get("author"), dict)
else ""
),
}
for commit in commits
if isinstance(commit, dict)
]
bounded, omitted = limit_items(normalized, configured_limit=parsed.limit)
return {
"owner": parsed.owner,
"repo": parsed.repo,
"pull_number": parsed.pull_number,
"commits": bounded,
"count": len(bounded),
"omitted": omitted,
}
except (GiteaAuthenticationError, GiteaAuthorizationError):
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to list pull request commits: {exc}") from exc
async def list_issue_comments_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
"""List comments on an issue or pull request."""
parsed = ListIssueCommentsArgs.model_validate(arguments)
try:
comments = await gitea.list_issue_comments(
parsed.owner, parsed.repo, parsed.issue_number, page=parsed.page, limit=parsed.limit
)
normalized = [
{
"id": comment.get("id", 0),
"author": (
comment.get("user", {}).get("login", "")
if isinstance(comment.get("user"), dict)
else ""
),
"body": limit_text(str(comment.get("body", ""))),
"created_at": comment.get("created_at", ""),
"updated_at": comment.get("updated_at", ""),
"url": comment.get("html_url", ""),
}
for comment in comments
if isinstance(comment, dict)
]
bounded, omitted = limit_items(normalized, configured_limit=parsed.limit)
return {
"owner": parsed.owner,
"repo": parsed.repo,
"issue_number": parsed.issue_number,
"comments": bounded,
"count": len(bounded),
"omitted": omitted,
}
except (GiteaAuthenticationError, GiteaAuthorizationError):
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to list issue comments: {exc}") from exc
def _normalize_branch(branch: dict[str, Any]) -> dict[str, Any]:
"""Normalize a Gitea branch payload."""
commit = branch.get("commit", {}) if isinstance(branch.get("commit"), dict) else {}
return {
"name": branch.get("name", ""),
"protected": branch.get("protected", False),
"commit": commit.get("id", ""),
}
async def list_branches_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
"""List repository branches."""
parsed = ListBranchesArgs.model_validate(arguments)
try:
branches = await gitea.list_branches(
parsed.owner, parsed.repo, page=parsed.page, limit=parsed.limit
)
normalized = [_normalize_branch(b) for b in branches if isinstance(b, dict)]
bounded, omitted = limit_items(normalized, configured_limit=parsed.limit)
return {
"owner": parsed.owner,
"repo": parsed.repo,
"branches": bounded,
"count": len(bounded),
"omitted": omitted,
}
except (GiteaAuthenticationError, GiteaAuthorizationError):
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to list branches: {exc}") from exc
async def get_branch_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
"""Get a single branch."""
parsed = GetBranchArgs.model_validate(arguments)
try:
branch = await gitea.get_branch(parsed.owner, parsed.repo, parsed.branch)
result = _normalize_branch(branch)
result.update({"owner": parsed.owner, "repo": parsed.repo})
return result
except (GiteaAuthenticationError, GiteaAuthorizationError):
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to get branch: {exc}") from exc
def _normalize_release(release: dict[str, Any]) -> dict[str, Any]:
"""Normalize a Gitea release payload."""
return {
"id": release.get("id", 0),
"tag_name": release.get("tag_name", ""),
"name": limit_text(str(release.get("name", ""))),
"draft": release.get("draft", False),
"prerelease": release.get("prerelease", False),
"body": limit_text(str(release.get("body", ""))),
"created_at": release.get("created_at", ""),
"published_at": release.get("published_at", ""),
"url": release.get("html_url", ""),
}
async def get_release_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
"""Get a release by id."""
parsed = GetReleaseArgs.model_validate(arguments)
try:
release = await gitea.get_release(parsed.owner, parsed.repo, parsed.release_id)
return _normalize_release(release)
except (GiteaAuthenticationError, GiteaAuthorizationError):
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to get release: {exc}") from exc
async def get_latest_release_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
"""Get the latest published release."""
parsed = LatestReleaseArgs.model_validate(arguments)
try:
release = await gitea.get_latest_release(parsed.owner, parsed.repo)
return _normalize_release(release)
except (GiteaAuthenticationError, GiteaAuthorizationError):
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to get latest release: {exc}") from exc
async def list_milestones_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
"""List repository milestones."""
parsed = ListMilestonesArgs.model_validate(arguments)
try:
milestones = await gitea.list_milestones(
parsed.owner, parsed.repo, state=parsed.state, page=parsed.page, limit=parsed.limit
)
normalized = [
{
"id": m.get("id", 0),
"title": limit_text(str(m.get("title", ""))),
"state": m.get("state", ""),
"open_issues": m.get("open_issues", 0),
"closed_issues": m.get("closed_issues", 0),
"due_on": m.get("due_on", ""),
}
for m in milestones
if isinstance(m, dict)
]
bounded, omitted = limit_items(normalized, configured_limit=parsed.limit)
return {
"owner": parsed.owner,
"repo": parsed.repo,
"state": parsed.state,
"milestones": bounded,
"count": len(bounded),
"omitted": omitted,
}
except (GiteaAuthenticationError, GiteaAuthorizationError):
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to list milestones: {exc}") from exc
async def get_commit_status_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
"""Get the combined commit status for a ref/sha."""
parsed = CommitStatusArgs.model_validate(arguments)
try:
status = await gitea.get_commit_status(parsed.owner, parsed.repo, parsed.sha)
statuses_raw = status.get("statuses", []) if isinstance(status, dict) else []
statuses = [
{
"context": s.get("context", ""),
"state": s.get("status", s.get("state", "")),
"target_url": s.get("target_url", ""),
}
for s in statuses_raw
if isinstance(s, dict)
]
bounded, omitted = limit_items(statuses)
return {
"owner": parsed.owner,
"repo": parsed.repo,
"sha": parsed.sha,
"state": status.get("state", "") if isinstance(status, dict) else "",
"statuses": bounded,
"omitted": omitted,
}
except (GiteaAuthenticationError, GiteaAuthorizationError):
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to get commit status: {exc}") from exc
def _normalize_repo_summary(repo: dict[str, Any]) -> dict[str, Any]:
"""Normalize a repository payload to a compact summary."""
owner = repo.get("owner", {})
return {
"owner": owner.get("login", "") if isinstance(owner, dict) else "",
"name": repo.get("name", ""),
"full_name": repo.get("full_name", ""),
"private": repo.get("private", False),
"description": limit_text(str(repo.get("description", ""))),
"url": repo.get("html_url", ""),
}
async def list_org_repositories_tool(
gitea: GiteaClient, arguments: dict[str, Any]
) -> dict[str, Any]:
"""List repositories belonging to an organization."""
parsed = ListOrgRepositoriesArgs.model_validate(arguments)
try:
repos = await gitea.list_org_repositories(parsed.org, page=parsed.page, limit=parsed.limit)
normalized = [_normalize_repo_summary(r) for r in repos if isinstance(r, dict)]
bounded, omitted = limit_items(normalized, configured_limit=parsed.limit)
return {
"org": parsed.org,
"repositories": bounded,
"count": len(bounded),
"omitted": omitted,
}
except (GiteaAuthenticationError, GiteaAuthorizationError):
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to list org repositories: {exc}") from exc
async def list_organizations_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
"""List organizations the authenticated user belongs to."""
parsed = ListOrganizationsArgs.model_validate(arguments)
try:
orgs = await gitea.list_organizations(page=parsed.page, limit=parsed.limit)
normalized = [
{
"id": org.get("id", 0),
"name": org.get("username", org.get("name", "")),
"full_name": org.get("full_name", ""),
"description": limit_text(str(org.get("description", ""))),
}
for org in orgs
if isinstance(org, dict)
]
bounded, omitted = limit_items(normalized, configured_limit=parsed.limit)
return {
"organizations": bounded,
"count": len(bounded),
"omitted": omitted,
}
except (GiteaAuthenticationError, GiteaAuthorizationError):
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to list organizations: {exc}") from exc
async def get_repo_languages_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
"""Get the language breakdown for a repository."""
parsed = RepoLanguagesArgs.model_validate(arguments)
try:
languages = await gitea.get_repo_languages(parsed.owner, parsed.repo)
cleaned = {str(name): value for name, value in languages.items() if isinstance(name, str)}
return {
"owner": parsed.owner,
"repo": parsed.repo,
"languages": cleaned,
}
except (GiteaAuthenticationError, GiteaAuthorizationError):
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to get repository languages: {exc}") from exc
async def list_repo_topics_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]:
"""List the topics assigned to a repository."""
parsed = RepoTopicsArgs.model_validate(arguments)
try:
topics = await gitea.list_repo_topics(parsed.owner, parsed.repo)
bounded, omitted = limit_items([{"topic": t} for t in topics])
return {
"owner": parsed.owner,
"repo": parsed.repo,
"topics": [entry["topic"] for entry in bounded],
"count": len(bounded),
"omitted": omitted,
}
except (GiteaAuthenticationError, GiteaAuthorizationError):
raise
except GiteaError as exc:
raise RuntimeError(f"Failed to list repository topics: {exc}") from exc