Files
openrabbit/tools/ai-review/agents/pr_agent.py
latte e21ec5f57a
All checks were successful
Enterprise AI Code Review / ai-review (pull_request) Successful in 7s
feat: Add automatic PR summary generator
Implements automatic PR summary generation feature that analyzes pull
request diffs and generates comprehensive summaries.

Features:
- Auto-generates summaries for PRs with empty descriptions
- Manual trigger via @codebot summarize command in PR comments
- Structured output with change type, files affected, and impact assessment
- Configurable (enable/disable, comment vs description update)

Implementation:
- Added pr_summary.md prompt template for LLM
- Extended PRAgent with summary generation methods
- Added auto_summary configuration in config.yml
- Comprehensive test suite with 10 new tests
- Updated documentation in README.md and CLAUDE.md

Usage:
- Automatic: Opens PR with no description → auto-generates summary
- Manual: Comment '@codebot summarize' on any PR

Related: Issue #2 - Milestone 2 feature delivery
2025-12-29 10:15:08 +00:00

1024 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Pull Request Review Agent
Enhanced AI agent for comprehensive PR reviews with inline comments,
security scanning, and automatic label management.
"""
import re
from dataclasses import dataclass, field
from agents.base_agent import AgentContext, AgentResult, BaseAgent
@dataclass
class ReviewIssue:
"""A single issue found in the PR."""
file: str
line: int | None
severity: str # HIGH, MEDIUM, LOW
category: str # Security, Correctness, Performance, etc.
description: str
recommendation: str
code_snippet: str | None = None
@dataclass
class PRReviewResult:
"""Result of a PR review."""
summary: str
issues: list[ReviewIssue]
overall_severity: str
approval: bool
security_issues: list[ReviewIssue] = field(default_factory=list)
class PRAgent(BaseAgent):
"""Agent for handling pull request reviews."""
# Marker specific to PR reviews
PR_AI_MARKER = "<!-- AI_PR_REVIEW -->"
PR_SUMMARY_MARKER = "<!-- AI_PR_SUMMARY -->"
def _get_label_config(self, category: str, key: str) -> dict:
"""Get full label configuration from config.
Args:
category: Label category (type, priority, status)
key: Label key within category (bug, high, ai_approved, etc.)
Returns:
Dict with name, color, description, aliases
"""
labels_config = self.config.get("labels", {})
category_config = labels_config.get(category, {})
label_config = category_config.get(key, {})
# Handle old string format
if isinstance(label_config, str):
return {
"name": label_config,
"color": "1d76db", # Default blue
"description": "",
"aliases": [],
}
# Handle new dict format
return {
"name": label_config.get("name", ""),
"color": label_config.get("color", "1d76db"),
"description": label_config.get("description", ""),
"aliases": label_config.get("aliases", []),
}
def can_handle(self, event_type: str, event_data: dict) -> bool:
"""Check if this agent handles the given event."""
# Check if agent is enabled
agent_config = self.config.get("agents", {}).get("pr", {})
if not agent_config.get("enabled", True):
return False
if event_type == "pull_request":
action = event_data.get("action", "")
allowed_events = agent_config.get("events", ["opened", "synchronize"])
return action in allowed_events
# Handle issue comments on PRs (for review-again and summarize commands)
if event_type == "issue_comment":
action = event_data.get("action", "")
if action == "created":
comment_body = event_data.get("comment", {}).get("body", "")
mention_prefix = self.config.get("interaction", {}).get(
"mention_prefix", "@codebot"
)
# Only handle if this is a PR
issue = event_data.get("issue", {})
is_pr = issue.get("pull_request") is not None
has_review_again = (
f"{mention_prefix} review-again" in comment_body.lower()
)
has_summarize = f"{mention_prefix} summarize" in comment_body.lower()
return is_pr and (has_review_again or has_summarize)
return False
def execute(self, context: AgentContext) -> AgentResult:
"""Execute the PR review agent."""
# Check if this is a comment-based command
if context.event_type == "issue_comment":
comment_body = context.event_data.get("comment", {}).get("body", "")
mention_prefix = self.config.get("interaction", {}).get(
"mention_prefix", "@codebot"
)
if f"{mention_prefix} summarize" in comment_body.lower():
return self._handle_summarize_command(context)
elif f"{mention_prefix} review-again" in comment_body.lower():
return self._handle_review_again(context)
pr = context.event_data.get("pull_request", {})
pr_number = pr.get("number")
self.logger.info(f"Reviewing PR #{pr_number}: {pr.get('title')}")
actions_taken = []
# Check if PR has empty description and auto-summary is enabled
pr_body = pr.get("body", "").strip()
agent_config = self.config.get("agents", {}).get("pr", {})
auto_summary_enabled = agent_config.get("auto_summary", {}).get("enabled", True)
if (
not pr_body
and auto_summary_enabled
and context.event_data.get("action") == "opened"
):
# Generate and post summary for empty PR descriptions
summary_result = self._generate_pr_summary(
context.owner, context.repo, pr_number
)
if summary_result:
actions_taken.append("Generated PR summary for empty description")
# Don't return here - continue with regular review
# Step 1: Get PR diff
diff = self._get_diff(context.owner, context.repo, pr_number)
if not diff.strip():
return AgentResult(
success=True,
message="PR has no changes to review",
)
# Step 2: Parse changed files
changed_files = self._parse_diff_files(diff)
# Step 3: Run security scan if enabled
security_issues = []
agent_config = self.config.get("agents", {}).get("pr", {})
if agent_config.get("security_scan", True):
security_issues = self._run_security_scan(changed_files, diff)
if security_issues:
actions_taken.append(f"Found {len(security_issues)} security issues")
# Step 4: Run AI review
review_result = self._run_ai_review(diff, context, security_issues)
# Step 5: Post inline comments if enabled
if agent_config.get("inline_comments", True) and review_result.issues:
inline_count = self._post_inline_comments(
context.owner, context.repo, pr_number, review_result
)
actions_taken.append(f"Posted {inline_count} inline comments")
# Step 6: Post summary comment
summary_comment = self._generate_summary_comment(review_result)
self.upsert_comment(
context.owner,
context.repo,
pr_number,
summary_comment,
marker=self.PR_AI_MARKER,
)
actions_taken.append("Posted summary comment")
# Step 7: Apply labels
labels_applied = self._apply_review_labels(
context.owner, context.repo, pr_number, review_result
)
if labels_applied:
actions_taken.append(f"Applied labels: {labels_applied}")
return AgentResult(
success=True,
message=f"Reviewed PR #{pr_number}: {review_result.overall_severity} severity",
data={
"severity": review_result.overall_severity,
"approval": review_result.approval,
"issues_count": len(review_result.issues),
"security_issues_count": len(review_result.security_issues),
},
actions_taken=actions_taken,
)
def _get_diff(self, owner: str, repo: str, pr_number: int) -> str:
"""Get the PR diff, truncated if necessary."""
max_lines = self.config.get("review", {}).get("max_diff_lines", 800)
try:
diff = self.gitea.get_pull_request_diff(owner, repo, pr_number)
lines = diff.splitlines()
if len(lines) > max_lines:
return "\n".join(lines[:max_lines])
return diff
except Exception as e:
self.logger.error(f"Failed to get diff: {e}")
return ""
def _parse_diff_files(self, diff: str) -> dict[str, str]:
"""Parse diff into file -> content mapping."""
files = {}
current_file = None
current_content = []
for line in diff.splitlines():
if line.startswith("diff --git"):
if current_file:
files[current_file] = "\n".join(current_content)
# Extract file path from "diff --git a/path b/path"
match = re.search(r"b/(.+)$", line)
if match:
current_file = match.group(1)
current_content = []
elif current_file:
current_content.append(line)
if current_file:
files[current_file] = "\n".join(current_content)
return files
def _run_security_scan(
self, changed_files: dict[str, str], diff: str
) -> list[ReviewIssue]:
"""Run security pattern scanning on the diff."""
issues = []
# Security patterns to detect
patterns = [
{
"name": "Hardcoded Secrets",
"pattern": r'(?i)(api_key|apikey|secret|password|token|auth)\s*[=:]\s*["\'][^"\']{8,}["\']',
"severity": "HIGH",
"category": "Security",
"description": "Potential hardcoded secret or API key detected",
"recommendation": "Move secrets to environment variables or a secrets manager",
},
{
"name": "SQL Injection",
"pattern": r'(?i)(execute|query)\s*\([^)]*\+[^)]*\)|f["\'].*\{.*\}.*(?:SELECT|INSERT|UPDATE|DELETE)',
"severity": "HIGH",
"category": "Security",
"description": "Potential SQL injection vulnerability - string concatenation in query",
"recommendation": "Use parameterized queries or prepared statements",
},
{
"name": "Hardcoded IP",
"pattern": r"\b(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b",
"severity": "LOW",
"category": "Security",
"description": "Hardcoded IP address detected",
"recommendation": "Consider using configuration or DNS names instead",
},
{
"name": "Eval Usage",
"pattern": r"\beval\s*\(",
"severity": "HIGH",
"category": "Security",
"description": "Use of eval() detected - potential code injection risk",
"recommendation": "Avoid eval() - use safer alternatives like ast.literal_eval() for Python",
},
{
"name": "Shell Injection",
"pattern": r"(?i)(?:subprocess\.call|os\.system|shell\s*=\s*True)",
"severity": "MEDIUM",
"category": "Security",
"description": "Potential shell command execution - verify input is sanitized",
"recommendation": "Use subprocess with shell=False and pass arguments as a list",
},
]
for filename, content in changed_files.items():
# Only check added lines (starting with +)
added_lines = []
line_numbers = []
current_line = 0
for line in content.splitlines():
if line.startswith("@@"):
# Parse line number from @@ -x,y +a,b @@
match = re.search(r"\+(\d+)", line)
if match:
current_line = int(match.group(1)) - 1
elif line.startswith("+") and not line.startswith("+++"):
current_line += 1
added_lines.append((current_line, line[1:]))
elif not line.startswith("-"):
current_line += 1
# Check patterns on added lines
for line_num, line_content in added_lines:
for pattern_def in patterns:
if re.search(pattern_def["pattern"], line_content):
issues.append(
ReviewIssue(
file=filename,
line=line_num,
severity=pattern_def["severity"],
category=pattern_def["category"],
description=pattern_def["description"],
recommendation=pattern_def["recommendation"],
code_snippet=line_content.strip()[:100],
)
)
return issues
def _run_ai_review(
self,
diff: str,
context: AgentContext,
security_issues: list[ReviewIssue],
) -> PRReviewResult:
"""Run AI-based code review."""
prompt_template = self.load_prompt("base")
# Add security context if issues were found
security_context = ""
if security_issues:
security_context = "\n\nSECURITY SCAN RESULTS (already detected):\n"
for issue in security_issues[:5]: # Limit to first 5
security_context += f"- [{issue.severity}] {issue.file}:{issue.line} - {issue.description}\n"
prompt = f"{prompt_template}\n{security_context}\nDIFF:\n{diff}"
try:
result = self.call_llm_json(prompt)
issues = []
for issue_data in result.get("issues", []):
issues.append(
ReviewIssue(
file=issue_data.get("file", "unknown"),
line=issue_data.get("line"),
severity=issue_data.get("severity", "MEDIUM"),
category=issue_data.get("category", "General"),
description=issue_data.get("description", ""),
recommendation=issue_data.get("recommendation", ""),
code_snippet=issue_data.get("code_snippet"),
)
)
return PRReviewResult(
summary=result.get("summary", "Review completed"),
issues=issues,
overall_severity=result.get("overall_severity", "LOW"),
approval=result.get("approval", True),
security_issues=security_issues,
)
except Exception as e:
self.logger.error(f"AI review failed: {e}")
return PRReviewResult(
summary=f"AI review encountered an error: {e}",
issues=[],
overall_severity="UNKNOWN",
approval=False,
security_issues=security_issues,
)
def _post_inline_comments(
self,
owner: str,
repo: str,
pr_number: int,
review: PRReviewResult,
) -> int:
"""Post inline comments for issues with line numbers."""
comments = []
all_issues = review.issues + review.security_issues
for issue in all_issues:
if issue.line and issue.file:
comment_body = (
f"**[{issue.severity}] {issue.category}**\n\n"
f"{issue.description}\n\n"
f"**Recommendation:** {issue.recommendation}"
)
comments.append(
{
"path": issue.file,
"line": issue.line,
"body": comment_body,
}
)
if not comments:
return 0
try:
# Use Gitea's pull request review API for inline comments
self.gitea.create_pull_request_review(
owner=owner,
repo=repo,
index=pr_number,
body="AI Code Review - Inline Comments",
event="COMMENT",
comments=comments[:10], # Limit to 10 inline comments
)
return min(len(comments), 10)
except Exception as e:
self.logger.warning(f"Failed to post inline comments: {e}")
return 0
def _generate_summary_comment(self, review: PRReviewResult) -> str:
"""Generate the summary comment for the PR."""
lines = [
f"{self.AI_DISCLAIMER}",
"",
"## AI Code Review",
"",
review.summary,
"",
]
# Statistics
all_issues = review.issues + review.security_issues
high = sum(1 for i in all_issues if i.severity == "HIGH")
medium = sum(1 for i in all_issues if i.severity == "MEDIUM")
low = sum(1 for i in all_issues if i.severity == "LOW")
lines.append("### Summary")
lines.append("")
lines.append(f"| Severity | Count |")
lines.append(f"|----------|-------|")
lines.append(f"| HIGH | {high} |")
lines.append(f"| MEDIUM | {medium} |")
lines.append(f"| LOW | {low} |")
lines.append("")
# Security issues section
if review.security_issues:
lines.append("### Security Issues")
lines.append("")
for issue in review.security_issues[:5]:
lines.append(
f"- **[{issue.severity}]** `{issue.file}:{issue.line}` - {issue.description}"
)
lines.append("")
# Other issues (limit display)
other_issues = [i for i in review.issues if i not in review.security_issues]
if other_issues:
lines.append("### Review Findings")
lines.append("")
for issue in other_issues[:10]:
loc = (
f"`{issue.file}:{issue.line}`" if issue.line else f"`{issue.file}`"
)
lines.append(f"- **[{issue.severity}]** {loc} - {issue.description}")
if len(other_issues) > 10:
lines.append(f"- ...and {len(other_issues) - 10} more issues")
lines.append("")
# Verdict
lines.append("---")
lines.append(f"**Overall Severity:** `{review.overall_severity}`")
if review.approval:
lines.append("**AI Recommendation:** Approve")
else:
lines.append("**AI Recommendation:** Changes Requested")
return "\n".join(lines)
def _apply_review_labels(
self,
owner: str,
repo: str,
pr_number: int,
review: PRReviewResult,
) -> list[str]:
"""Apply labels based on review result."""
try:
repo_labels = self.gitea.get_repo_labels(owner, repo)
label_map = {l["name"]: l["id"] for l in repo_labels}
except Exception as e:
self.logger.warning(f"Failed to get repo labels: {e}")
return []
def _handle_review_again(self, context: AgentContext) -> AgentResult:
"""Re-run PR review on current state."""
issue = context.event_data.get("issue", {})
pr_number = issue.get("number")
comment_author = (
context.event_data.get("comment", {}).get("user", {}).get("login", "user")
)
self.logger.info(f"Re-reviewing PR #{pr_number} at user request")
# Get previous review comment
previous_comment = self._find_previous_review(
context.owner, context.repo, pr_number
)
previous_findings = []
if previous_comment:
previous_findings = self._parse_review_comment(previous_comment)
# Run new review (reuse existing review logic)
actions_taken = []
# Step 1: Get PR diff
diff = self._get_diff(context.owner, context.repo, pr_number)
if not diff.strip():
response = f"@{comment_author}\n\n{self.AI_DISCLAIMER}\n\n**🔄 Re-review Requested**\n\nPR has no changes to review."
self.gitea.create_issue_comment(
context.owner, context.repo, pr_number, response
)
return AgentResult(
success=True,
message="PR has no changes to review",
)
# Step 2: Parse changed files
changed_files = self._parse_diff_files(diff)
# Step 3: Run security scan if enabled
security_issues = []
agent_config = self.config.get("agents", {}).get("pr", {})
if agent_config.get("security_scan", True):
security_issues = self._run_security_scan(changed_files, diff)
# Step 4: Run AI review
review_result = self._run_ai_review(diff, context, security_issues)
# Step 5: Compare with previous review
current_findings = self._extract_findings_from_review(review_result)
diff_result = self._compare_reviews(previous_findings, current_findings)
# Step 6: Generate updated review with comparison
updated_review = self._format_review_update(
review_result, diff_result, comment_author
)
# Step 7: Update existing comment (or create new one)
self.upsert_comment(
context.owner,
context.repo,
pr_number,
updated_review,
marker=self.PR_AI_MARKER,
)
actions_taken.append("Updated review comment")
# Step 8: Update PR labels
labels_applied = self._apply_review_labels(
context.owner, context.repo, pr_number, review_result
)
if labels_applied:
actions_taken.append(f"Updated labels: {labels_applied}")
return AgentResult(
success=True,
message=f"Re-reviewed PR #{pr_number}: {review_result.overall_severity} severity",
data={
"severity": review_result.overall_severity,
"approval": review_result.approval,
"issues_count": len(review_result.issues),
"security_issues_count": len(review_result.security_issues),
"resolved_count": len(diff_result.get("resolved", [])),
"new_count": len(diff_result.get("new", [])),
},
actions_taken=actions_taken,
)
def _find_previous_review(
self, owner: str, repo: str, pr_number: int
) -> str | None:
"""Find the previous AI review comment."""
comment_id = self.find_ai_comment(
owner, repo, pr_number, marker=self.PR_AI_MARKER
)
if not comment_id:
return None
# Get the comment content
comments = self.gitea.list_issue_comments(owner, repo, pr_number)
for comment in comments:
if comment.get("id") == comment_id:
return comment.get("body", "")
return None
def _parse_review_comment(self, comment_text: str) -> list[dict]:
"""Parse previous review comment to extract findings.
Returns:
List of findings with file, line, severity, description
"""
findings = []
if not comment_text:
return findings
# Look for patterns like: **[HIGH]** `src/file.py:45` - Description
pattern = r"\*\*\[(\w+)\]\*\*\s+`([^:]+):(\d+)`\s+-\s+(.+?)(?:\n|$)"
for match in re.finditer(pattern, comment_text):
findings.append(
{
"severity": match.group(1),
"file": match.group(2),
"line": int(match.group(3)),
"description": match.group(4).strip(),
}
)
return findings
def _extract_findings_from_review(self, review: PRReviewResult) -> list[dict]:
"""Extract findings from PRReviewResult into comparable format."""
findings = []
all_issues = review.issues + review.security_issues
for issue in all_issues:
findings.append(
{
"severity": issue.severity,
"file": issue.file,
"line": issue.line or 0,
"description": issue.description,
"category": issue.category,
}
)
return findings
def _finding_key(self, finding: dict) -> str:
"""Create unique key for a finding."""
file_path = finding.get("file", "unknown")
line = finding.get("line", 0)
# Use first 50 chars of description for matching
desc_key = finding.get("description", "")[:50]
return f"{file_path}:{line}:{desc_key}"
def _compare_reviews(
self, previous_findings: list[dict], new_findings: list[dict]
) -> dict:
"""Compare previous and new review to show what changed.
Returns:
{
"resolved": [...], # Issues that disappeared
"new": [...], # New issues found
"still_present": [...], # Issues that remain
"severity_changed": {...} # OLD severity -> NEW severity
}
"""
# Create lookup keys (file:line:description)
prev_keys = {self._finding_key(f): f for f in previous_findings}
new_keys = {self._finding_key(f): f for f in new_findings}
resolved = [prev_keys[key] for key in prev_keys if key not in new_keys]
new = [new_keys[key] for key in new_keys if key not in prev_keys]
still_present = [new_keys[key] for key in new_keys if key in prev_keys]
severity_changed = {}
for key in prev_keys:
if key in new_keys:
prev_severity = prev_keys[key].get("severity")
new_severity = new_keys[key].get("severity")
if prev_severity != new_severity:
severity_changed[key] = {
"old": prev_severity,
"new": new_severity,
"finding": new_keys[key],
}
return {
"resolved": resolved,
"new": new,
"still_present": still_present,
"severity_changed": severity_changed,
}
def _format_review_update(
self, review: PRReviewResult, diff: dict, comment_author: str
) -> str:
"""Format review with comparison to previous run."""
lines = [f"@{comment_author}\n"]
lines.append(f"{self.AI_DISCLAIMER}\n")
lines.append("**🔄 Re-review Requested**\n")
lines.append("---\n")
lines.append("## AI Code Review (Updated)\n")
# Summary of changes
prev_total = len(diff["resolved"]) + len(diff["still_present"])
curr_total = len(diff["new"]) + len(diff["still_present"])
if prev_total > 0:
lines.append(f"**Previous Review:** {prev_total} issues")
lines.append(f"**Current Review:** {curr_total} issues\n")
else:
lines.append("**First Review** - No previous review found\n")
# Changes section (only if there was a previous review)
if prev_total > 0:
lines.append("### Changes from Previous Review\n")
if diff["resolved"]:
lines.append(f"**✅ Resolved ({len(diff['resolved'])}):**")
for finding in diff["resolved"][:5]: # Show max 5
lines.append(
f"- **[{finding['severity']}]** `{finding['file']}:{finding['line']}` - {finding['description']}"
)
if len(diff["resolved"]) > 5:
lines.append(f"- ... and {len(diff['resolved']) - 5} more")
lines.append("")
if diff["new"]:
lines.append(f"**⚠️ New Issues ({len(diff['new'])}):**")
for finding in diff["new"][:5]:
lines.append(
f"- **[{finding['severity']}]** `{finding['file']}:{finding['line']}` - {finding['description']}"
)
if len(diff["new"]) > 5:
lines.append(f"- ... and {len(diff['new']) - 5} more")
lines.append("")
if diff["severity_changed"]:
lines.append(
f"**🔄 Severity Changed ({len(diff['severity_changed'])}):**"
)
for key, change in list(diff["severity_changed"].items())[:5]:
finding = change["finding"]
lines.append(
f"- `{finding['file']}:{finding['line']}` - {change['old']}{change['new']}"
)
lines.append("")
# Summary table
all_issues = review.issues + review.security_issues
high = sum(1 for i in all_issues if i.severity == "HIGH")
medium = sum(1 for i in all_issues if i.severity == "MEDIUM")
low = sum(1 for i in all_issues if i.severity == "LOW")
lines.append("### Summary\n")
lines.append("| Severity | Count |")
lines.append("|----------|-------|")
lines.append(f"| HIGH | {high} |")
lines.append(f"| MEDIUM | {medium} |")
lines.append(f"| LOW | {low} |")
lines.append("")
# Security issues section (if any)
if review.security_issues:
lines.append("### Security Issues\n")
for issue in review.security_issues[:5]:
loc = (
f"`{issue.file}:{issue.line}`" if issue.line else f"`{issue.file}`"
)
lines.append(f"- **[{issue.severity}]** {loc} - {issue.description}")
if len(review.security_issues) > 5:
lines.append(f"- ... and {len(review.security_issues) - 5} more")
lines.append("")
# Other issues (limit display)
other_issues = [i for i in review.issues if i not in review.security_issues]
if other_issues:
lines.append("### Review Findings\n")
for issue in other_issues[:10]:
loc = (
f"`{issue.file}:{issue.line}`" if issue.line else f"`{issue.file}`"
)
lines.append(f"- **[{issue.severity}]** {loc} - {issue.description}")
if len(other_issues) > 10:
lines.append(f"- ... and {len(other_issues) - 10} more issues")
lines.append("")
# Verdict
lines.append("---")
lines.append(f"**Overall Severity:** `{review.overall_severity}`")
if review.approval:
lines.append("**AI Recommendation:** Approved ✅")
else:
lines.append("**AI Recommendation:** Changes Requested ⚠️")
return "\n".join(lines)
labels_to_add = []
# Add approval/changes required label
# Use helper to support both old string and new dict format
if review.approval:
label_config = self._get_label_config("status", "ai_approved")
else:
label_config = self._get_label_config("status", "ai_changes_required")
label_name = label_config.get("name", "")
if label_name and label_name in label_map:
labels_to_add.append(label_map[label_name])
if labels_to_add:
try:
self.gitea.add_issue_labels(owner, repo, pr_number, labels_to_add)
return [name for name, id in label_map.items() if id in labels_to_add]
except Exception as e:
self.logger.warning(f"Failed to add labels: {e}")
return []
def _generate_pr_summary(self, owner: str, repo: str, pr_number: int) -> bool:
"""Generate and post a summary for a PR.
Args:
owner: Repository owner
repo: Repository name
pr_number: PR number
Returns:
True if summary was generated successfully, False otherwise
"""
try:
# Get PR diff
diff = self._get_diff(owner, repo, pr_number)
if not diff.strip():
self.logger.info(f"No diff to summarize for PR #{pr_number}")
return False
# Load summary prompt
prompt_template = self.load_prompt("pr_summary")
prompt = f"{prompt_template}\n{diff}"
# Call LLM to generate summary
result = self.call_llm_json(prompt)
# Format the summary comment
summary_comment = self._format_pr_summary(result)
# Post as first comment (or update PR description based on config)
agent_config = self.config.get("agents", {}).get("pr", {})
auto_summary_config = agent_config.get("auto_summary", {})
post_as_comment = auto_summary_config.get("post_as_comment", True)
if post_as_comment:
# Post as comment
self.gitea.create_issue_comment(owner, repo, pr_number, summary_comment)
self.logger.info(f"Posted PR summary as comment for PR #{pr_number}")
else:
# Update PR description (requires different API call)
# Note: Gitea API may not support updating PR description
# In that case, fall back to posting as comment
try:
self.gitea.update_pull_request(
owner, repo, pr_number, body=summary_comment
)
self.logger.info(
f"Updated PR description with summary for PR #{pr_number}"
)
except Exception as e:
self.logger.warning(
f"Could not update PR description, posting as comment: {e}"
)
self.gitea.create_issue_comment(
owner, repo, pr_number, summary_comment
)
return True
except Exception as e:
self.logger.error(f"Failed to generate PR summary: {e}")
return False
def _format_pr_summary(self, summary_data: dict) -> str:
"""Format the PR summary data into a readable comment.
Args:
summary_data: JSON data from LLM containing summary information
Returns:
Formatted markdown comment
"""
lines = [
self.AI_DISCLAIMER,
"",
"## 📋 Pull Request Summary",
"",
summary_data.get("summary", "Summary unavailable"),
"",
]
# Change type
change_type = summary_data.get("change_type", "Unknown")
change_type_emoji = {
"Feature": "",
"Bugfix": "🐛",
"Refactor": "♻️",
"Documentation": "📚",
"Testing": "🧪",
"Mixed": "🔀",
}
emoji = change_type_emoji.get(change_type, "🔀")
lines.append(f"**Type:** {emoji} {change_type}")
lines.append("")
# Key changes
key_changes = summary_data.get("key_changes", {})
if key_changes:
lines.append("## Changes")
lines.append("")
added = key_changes.get("added", [])
if added:
lines.append("**✅ Added:**")
for item in added:
lines.append(f"- {item}")
lines.append("")
modified = key_changes.get("modified", [])
if modified:
lines.append("**📝 Modified:**")
for item in modified:
lines.append(f"- {item}")
lines.append("")
removed = key_changes.get("removed", [])
if removed:
lines.append("**❌ Removed:**")
for item in removed:
lines.append(f"- {item}")
lines.append("")
# Files affected
files = summary_data.get("files_affected", [])
if files:
lines.append("## Files Affected")
lines.append("")
for file_info in files[:10]: # Limit to first 10 files
path = file_info.get("path", "unknown")
desc = file_info.get("description", "")
change_type = file_info.get("change_type", "modified")
type_icon = {"added": "", "modified": "📝", "deleted": ""}
icon = type_icon.get(change_type, "📝")
lines.append(f"- {icon} `{path}` - {desc}")
if len(files) > 10:
lines.append(f"- ... and {len(files) - 10} more files")
lines.append("")
# Impact assessment
impact = summary_data.get("impact", {})
if impact:
scope = impact.get("scope", "unknown")
description = impact.get("description", "")
scope_emoji = {"small": "🟢", "medium": "🟡", "large": "🔴"}
emoji = scope_emoji.get(scope, "")
lines.append("## Impact")
lines.append(f"{emoji} **Scope:** {scope.capitalize()}")
lines.append(f"{description}")
return "\n".join(lines)
def _handle_summarize_command(self, context: AgentContext) -> AgentResult:
"""Handle @codebot summarize command from PR comments.
Args:
context: Agent context with event data
Returns:
AgentResult with success status and actions taken
"""
issue = context.event_data.get("issue", {})
pr_number = issue.get("number")
comment_author = (
context.event_data.get("comment", {}).get("user", {}).get("login", "user")
)
self.logger.info(f"Generating PR summary for PR #{pr_number} at user request")
# Generate and post summary
success = self._generate_pr_summary(context.owner, context.repo, pr_number)
if success:
return AgentResult(
success=True,
message=f"Generated PR summary for PR #{pr_number}",
actions_taken=["Posted PR summary comment"],
)
else:
# Post error message
error_msg = (
f"@{comment_author}\n\n"
f"{self.AI_DISCLAIMER}\n\n"
"**⚠️ Summary Generation Failed**\n\n"
"I was unable to generate a summary for this PR. "
"This could be because:\n"
"- The PR has no changes\n"
"- There was an error accessing the diff\n"
"- The LLM service is unavailable"
)
self.gitea.create_issue_comment(
context.owner, context.repo, pr_number, error_msg
)
return AgentResult(
success=False,
message=f"Failed to generate PR summary for PR #{pr_number}",
error="Summary generation failed",
)