All checks were successful
Enterprise AI Code Review / ai-review (pull_request) Successful in 7s
Implements automatic PR summary generation feature that analyzes pull request diffs and generates comprehensive summaries. Features: - Auto-generates summaries for PRs with empty descriptions - Manual trigger via @codebot summarize command in PR comments - Structured output with change type, files affected, and impact assessment - Configurable (enable/disable, comment vs description update) Implementation: - Added pr_summary.md prompt template for LLM - Extended PRAgent with summary generation methods - Added auto_summary configuration in config.yml - Comprehensive test suite with 10 new tests - Updated documentation in README.md and CLAUDE.md Usage: - Automatic: Opens PR with no description → auto-generates summary - Manual: Comment '@codebot summarize' on any PR Related: Issue #2 - Milestone 2 feature delivery
1024 lines
38 KiB
Python
1024 lines
38 KiB
Python
"""Pull Request Review Agent
|
||
|
||
Enhanced AI agent for comprehensive PR reviews with inline comments,
|
||
security scanning, and automatic label management.
|
||
"""
|
||
|
||
import re
|
||
from dataclasses import dataclass, field
|
||
|
||
from agents.base_agent import AgentContext, AgentResult, BaseAgent
|
||
|
||
|
||
@dataclass
|
||
class ReviewIssue:
|
||
"""A single issue found in the PR."""
|
||
|
||
file: str
|
||
line: int | None
|
||
severity: str # HIGH, MEDIUM, LOW
|
||
category: str # Security, Correctness, Performance, etc.
|
||
description: str
|
||
recommendation: str
|
||
code_snippet: str | None = None
|
||
|
||
|
||
@dataclass
|
||
class PRReviewResult:
|
||
"""Result of a PR review."""
|
||
|
||
summary: str
|
||
issues: list[ReviewIssue]
|
||
overall_severity: str
|
||
approval: bool
|
||
security_issues: list[ReviewIssue] = field(default_factory=list)
|
||
|
||
|
||
class PRAgent(BaseAgent):
|
||
"""Agent for handling pull request reviews."""
|
||
|
||
# Marker specific to PR reviews
|
||
PR_AI_MARKER = "<!-- AI_PR_REVIEW -->"
|
||
PR_SUMMARY_MARKER = "<!-- AI_PR_SUMMARY -->"
|
||
|
||
def _get_label_config(self, category: str, key: str) -> dict:
|
||
"""Get full label configuration from config.
|
||
|
||
Args:
|
||
category: Label category (type, priority, status)
|
||
key: Label key within category (bug, high, ai_approved, etc.)
|
||
|
||
Returns:
|
||
Dict with name, color, description, aliases
|
||
"""
|
||
labels_config = self.config.get("labels", {})
|
||
category_config = labels_config.get(category, {})
|
||
label_config = category_config.get(key, {})
|
||
|
||
# Handle old string format
|
||
if isinstance(label_config, str):
|
||
return {
|
||
"name": label_config,
|
||
"color": "1d76db", # Default blue
|
||
"description": "",
|
||
"aliases": [],
|
||
}
|
||
|
||
# Handle new dict format
|
||
return {
|
||
"name": label_config.get("name", ""),
|
||
"color": label_config.get("color", "1d76db"),
|
||
"description": label_config.get("description", ""),
|
||
"aliases": label_config.get("aliases", []),
|
||
}
|
||
|
||
def can_handle(self, event_type: str, event_data: dict) -> bool:
|
||
"""Check if this agent handles the given event."""
|
||
# Check if agent is enabled
|
||
agent_config = self.config.get("agents", {}).get("pr", {})
|
||
if not agent_config.get("enabled", True):
|
||
return False
|
||
|
||
if event_type == "pull_request":
|
||
action = event_data.get("action", "")
|
||
allowed_events = agent_config.get("events", ["opened", "synchronize"])
|
||
return action in allowed_events
|
||
|
||
# Handle issue comments on PRs (for review-again and summarize commands)
|
||
if event_type == "issue_comment":
|
||
action = event_data.get("action", "")
|
||
if action == "created":
|
||
comment_body = event_data.get("comment", {}).get("body", "")
|
||
mention_prefix = self.config.get("interaction", {}).get(
|
||
"mention_prefix", "@codebot"
|
||
)
|
||
# Only handle if this is a PR
|
||
issue = event_data.get("issue", {})
|
||
is_pr = issue.get("pull_request") is not None
|
||
has_review_again = (
|
||
f"{mention_prefix} review-again" in comment_body.lower()
|
||
)
|
||
has_summarize = f"{mention_prefix} summarize" in comment_body.lower()
|
||
return is_pr and (has_review_again or has_summarize)
|
||
|
||
return False
|
||
|
||
def execute(self, context: AgentContext) -> AgentResult:
|
||
"""Execute the PR review agent."""
|
||
# Check if this is a comment-based command
|
||
if context.event_type == "issue_comment":
|
||
comment_body = context.event_data.get("comment", {}).get("body", "")
|
||
mention_prefix = self.config.get("interaction", {}).get(
|
||
"mention_prefix", "@codebot"
|
||
)
|
||
if f"{mention_prefix} summarize" in comment_body.lower():
|
||
return self._handle_summarize_command(context)
|
||
elif f"{mention_prefix} review-again" in comment_body.lower():
|
||
return self._handle_review_again(context)
|
||
|
||
pr = context.event_data.get("pull_request", {})
|
||
pr_number = pr.get("number")
|
||
|
||
self.logger.info(f"Reviewing PR #{pr_number}: {pr.get('title')}")
|
||
|
||
actions_taken = []
|
||
|
||
# Check if PR has empty description and auto-summary is enabled
|
||
pr_body = pr.get("body", "").strip()
|
||
agent_config = self.config.get("agents", {}).get("pr", {})
|
||
auto_summary_enabled = agent_config.get("auto_summary", {}).get("enabled", True)
|
||
|
||
if (
|
||
not pr_body
|
||
and auto_summary_enabled
|
||
and context.event_data.get("action") == "opened"
|
||
):
|
||
# Generate and post summary for empty PR descriptions
|
||
summary_result = self._generate_pr_summary(
|
||
context.owner, context.repo, pr_number
|
||
)
|
||
if summary_result:
|
||
actions_taken.append("Generated PR summary for empty description")
|
||
# Don't return here - continue with regular review
|
||
|
||
# Step 1: Get PR diff
|
||
diff = self._get_diff(context.owner, context.repo, pr_number)
|
||
if not diff.strip():
|
||
return AgentResult(
|
||
success=True,
|
||
message="PR has no changes to review",
|
||
)
|
||
|
||
# Step 2: Parse changed files
|
||
changed_files = self._parse_diff_files(diff)
|
||
|
||
# Step 3: Run security scan if enabled
|
||
security_issues = []
|
||
agent_config = self.config.get("agents", {}).get("pr", {})
|
||
if agent_config.get("security_scan", True):
|
||
security_issues = self._run_security_scan(changed_files, diff)
|
||
if security_issues:
|
||
actions_taken.append(f"Found {len(security_issues)} security issues")
|
||
|
||
# Step 4: Run AI review
|
||
review_result = self._run_ai_review(diff, context, security_issues)
|
||
|
||
# Step 5: Post inline comments if enabled
|
||
if agent_config.get("inline_comments", True) and review_result.issues:
|
||
inline_count = self._post_inline_comments(
|
||
context.owner, context.repo, pr_number, review_result
|
||
)
|
||
actions_taken.append(f"Posted {inline_count} inline comments")
|
||
|
||
# Step 6: Post summary comment
|
||
summary_comment = self._generate_summary_comment(review_result)
|
||
self.upsert_comment(
|
||
context.owner,
|
||
context.repo,
|
||
pr_number,
|
||
summary_comment,
|
||
marker=self.PR_AI_MARKER,
|
||
)
|
||
actions_taken.append("Posted summary comment")
|
||
|
||
# Step 7: Apply labels
|
||
labels_applied = self._apply_review_labels(
|
||
context.owner, context.repo, pr_number, review_result
|
||
)
|
||
if labels_applied:
|
||
actions_taken.append(f"Applied labels: {labels_applied}")
|
||
|
||
return AgentResult(
|
||
success=True,
|
||
message=f"Reviewed PR #{pr_number}: {review_result.overall_severity} severity",
|
||
data={
|
||
"severity": review_result.overall_severity,
|
||
"approval": review_result.approval,
|
||
"issues_count": len(review_result.issues),
|
||
"security_issues_count": len(review_result.security_issues),
|
||
},
|
||
actions_taken=actions_taken,
|
||
)
|
||
|
||
def _get_diff(self, owner: str, repo: str, pr_number: int) -> str:
|
||
"""Get the PR diff, truncated if necessary."""
|
||
max_lines = self.config.get("review", {}).get("max_diff_lines", 800)
|
||
|
||
try:
|
||
diff = self.gitea.get_pull_request_diff(owner, repo, pr_number)
|
||
lines = diff.splitlines()
|
||
if len(lines) > max_lines:
|
||
return "\n".join(lines[:max_lines])
|
||
return diff
|
||
except Exception as e:
|
||
self.logger.error(f"Failed to get diff: {e}")
|
||
return ""
|
||
|
||
def _parse_diff_files(self, diff: str) -> dict[str, str]:
|
||
"""Parse diff into file -> content mapping."""
|
||
files = {}
|
||
current_file = None
|
||
current_content = []
|
||
|
||
for line in diff.splitlines():
|
||
if line.startswith("diff --git"):
|
||
if current_file:
|
||
files[current_file] = "\n".join(current_content)
|
||
# Extract file path from "diff --git a/path b/path"
|
||
match = re.search(r"b/(.+)$", line)
|
||
if match:
|
||
current_file = match.group(1)
|
||
current_content = []
|
||
elif current_file:
|
||
current_content.append(line)
|
||
|
||
if current_file:
|
||
files[current_file] = "\n".join(current_content)
|
||
|
||
return files
|
||
|
||
def _run_security_scan(
|
||
self, changed_files: dict[str, str], diff: str
|
||
) -> list[ReviewIssue]:
|
||
"""Run security pattern scanning on the diff."""
|
||
issues = []
|
||
|
||
# Security patterns to detect
|
||
patterns = [
|
||
{
|
||
"name": "Hardcoded Secrets",
|
||
"pattern": r'(?i)(api_key|apikey|secret|password|token|auth)\s*[=:]\s*["\'][^"\']{8,}["\']',
|
||
"severity": "HIGH",
|
||
"category": "Security",
|
||
"description": "Potential hardcoded secret or API key detected",
|
||
"recommendation": "Move secrets to environment variables or a secrets manager",
|
||
},
|
||
{
|
||
"name": "SQL Injection",
|
||
"pattern": r'(?i)(execute|query)\s*\([^)]*\+[^)]*\)|f["\'].*\{.*\}.*(?:SELECT|INSERT|UPDATE|DELETE)',
|
||
"severity": "HIGH",
|
||
"category": "Security",
|
||
"description": "Potential SQL injection vulnerability - string concatenation in query",
|
||
"recommendation": "Use parameterized queries or prepared statements",
|
||
},
|
||
{
|
||
"name": "Hardcoded IP",
|
||
"pattern": r"\b(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b",
|
||
"severity": "LOW",
|
||
"category": "Security",
|
||
"description": "Hardcoded IP address detected",
|
||
"recommendation": "Consider using configuration or DNS names instead",
|
||
},
|
||
{
|
||
"name": "Eval Usage",
|
||
"pattern": r"\beval\s*\(",
|
||
"severity": "HIGH",
|
||
"category": "Security",
|
||
"description": "Use of eval() detected - potential code injection risk",
|
||
"recommendation": "Avoid eval() - use safer alternatives like ast.literal_eval() for Python",
|
||
},
|
||
{
|
||
"name": "Shell Injection",
|
||
"pattern": r"(?i)(?:subprocess\.call|os\.system|shell\s*=\s*True)",
|
||
"severity": "MEDIUM",
|
||
"category": "Security",
|
||
"description": "Potential shell command execution - verify input is sanitized",
|
||
"recommendation": "Use subprocess with shell=False and pass arguments as a list",
|
||
},
|
||
]
|
||
|
||
for filename, content in changed_files.items():
|
||
# Only check added lines (starting with +)
|
||
added_lines = []
|
||
line_numbers = []
|
||
current_line = 0
|
||
|
||
for line in content.splitlines():
|
||
if line.startswith("@@"):
|
||
# Parse line number from @@ -x,y +a,b @@
|
||
match = re.search(r"\+(\d+)", line)
|
||
if match:
|
||
current_line = int(match.group(1)) - 1
|
||
elif line.startswith("+") and not line.startswith("+++"):
|
||
current_line += 1
|
||
added_lines.append((current_line, line[1:]))
|
||
elif not line.startswith("-"):
|
||
current_line += 1
|
||
|
||
# Check patterns on added lines
|
||
for line_num, line_content in added_lines:
|
||
for pattern_def in patterns:
|
||
if re.search(pattern_def["pattern"], line_content):
|
||
issues.append(
|
||
ReviewIssue(
|
||
file=filename,
|
||
line=line_num,
|
||
severity=pattern_def["severity"],
|
||
category=pattern_def["category"],
|
||
description=pattern_def["description"],
|
||
recommendation=pattern_def["recommendation"],
|
||
code_snippet=line_content.strip()[:100],
|
||
)
|
||
)
|
||
|
||
return issues
|
||
|
||
def _run_ai_review(
|
||
self,
|
||
diff: str,
|
||
context: AgentContext,
|
||
security_issues: list[ReviewIssue],
|
||
) -> PRReviewResult:
|
||
"""Run AI-based code review."""
|
||
prompt_template = self.load_prompt("base")
|
||
|
||
# Add security context if issues were found
|
||
security_context = ""
|
||
if security_issues:
|
||
security_context = "\n\nSECURITY SCAN RESULTS (already detected):\n"
|
||
for issue in security_issues[:5]: # Limit to first 5
|
||
security_context += f"- [{issue.severity}] {issue.file}:{issue.line} - {issue.description}\n"
|
||
|
||
prompt = f"{prompt_template}\n{security_context}\nDIFF:\n{diff}"
|
||
|
||
try:
|
||
result = self.call_llm_json(prompt)
|
||
|
||
issues = []
|
||
for issue_data in result.get("issues", []):
|
||
issues.append(
|
||
ReviewIssue(
|
||
file=issue_data.get("file", "unknown"),
|
||
line=issue_data.get("line"),
|
||
severity=issue_data.get("severity", "MEDIUM"),
|
||
category=issue_data.get("category", "General"),
|
||
description=issue_data.get("description", ""),
|
||
recommendation=issue_data.get("recommendation", ""),
|
||
code_snippet=issue_data.get("code_snippet"),
|
||
)
|
||
)
|
||
|
||
return PRReviewResult(
|
||
summary=result.get("summary", "Review completed"),
|
||
issues=issues,
|
||
overall_severity=result.get("overall_severity", "LOW"),
|
||
approval=result.get("approval", True),
|
||
security_issues=security_issues,
|
||
)
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"AI review failed: {e}")
|
||
return PRReviewResult(
|
||
summary=f"AI review encountered an error: {e}",
|
||
issues=[],
|
||
overall_severity="UNKNOWN",
|
||
approval=False,
|
||
security_issues=security_issues,
|
||
)
|
||
|
||
def _post_inline_comments(
|
||
self,
|
||
owner: str,
|
||
repo: str,
|
||
pr_number: int,
|
||
review: PRReviewResult,
|
||
) -> int:
|
||
"""Post inline comments for issues with line numbers."""
|
||
comments = []
|
||
|
||
all_issues = review.issues + review.security_issues
|
||
for issue in all_issues:
|
||
if issue.line and issue.file:
|
||
comment_body = (
|
||
f"**[{issue.severity}] {issue.category}**\n\n"
|
||
f"{issue.description}\n\n"
|
||
f"**Recommendation:** {issue.recommendation}"
|
||
)
|
||
comments.append(
|
||
{
|
||
"path": issue.file,
|
||
"line": issue.line,
|
||
"body": comment_body,
|
||
}
|
||
)
|
||
|
||
if not comments:
|
||
return 0
|
||
|
||
try:
|
||
# Use Gitea's pull request review API for inline comments
|
||
self.gitea.create_pull_request_review(
|
||
owner=owner,
|
||
repo=repo,
|
||
index=pr_number,
|
||
body="AI Code Review - Inline Comments",
|
||
event="COMMENT",
|
||
comments=comments[:10], # Limit to 10 inline comments
|
||
)
|
||
return min(len(comments), 10)
|
||
except Exception as e:
|
||
self.logger.warning(f"Failed to post inline comments: {e}")
|
||
return 0
|
||
|
||
def _generate_summary_comment(self, review: PRReviewResult) -> str:
|
||
"""Generate the summary comment for the PR."""
|
||
lines = [
|
||
f"{self.AI_DISCLAIMER}",
|
||
"",
|
||
"## AI Code Review",
|
||
"",
|
||
review.summary,
|
||
"",
|
||
]
|
||
|
||
# Statistics
|
||
all_issues = review.issues + review.security_issues
|
||
high = sum(1 for i in all_issues if i.severity == "HIGH")
|
||
medium = sum(1 for i in all_issues if i.severity == "MEDIUM")
|
||
low = sum(1 for i in all_issues if i.severity == "LOW")
|
||
|
||
lines.append("### Summary")
|
||
lines.append("")
|
||
lines.append(f"| Severity | Count |")
|
||
lines.append(f"|----------|-------|")
|
||
lines.append(f"| HIGH | {high} |")
|
||
lines.append(f"| MEDIUM | {medium} |")
|
||
lines.append(f"| LOW | {low} |")
|
||
lines.append("")
|
||
|
||
# Security issues section
|
||
if review.security_issues:
|
||
lines.append("### Security Issues")
|
||
lines.append("")
|
||
for issue in review.security_issues[:5]:
|
||
lines.append(
|
||
f"- **[{issue.severity}]** `{issue.file}:{issue.line}` - {issue.description}"
|
||
)
|
||
lines.append("")
|
||
|
||
# Other issues (limit display)
|
||
other_issues = [i for i in review.issues if i not in review.security_issues]
|
||
if other_issues:
|
||
lines.append("### Review Findings")
|
||
lines.append("")
|
||
for issue in other_issues[:10]:
|
||
loc = (
|
||
f"`{issue.file}:{issue.line}`" if issue.line else f"`{issue.file}`"
|
||
)
|
||
lines.append(f"- **[{issue.severity}]** {loc} - {issue.description}")
|
||
if len(other_issues) > 10:
|
||
lines.append(f"- ...and {len(other_issues) - 10} more issues")
|
||
lines.append("")
|
||
|
||
# Verdict
|
||
lines.append("---")
|
||
lines.append(f"**Overall Severity:** `{review.overall_severity}`")
|
||
if review.approval:
|
||
lines.append("**AI Recommendation:** Approve")
|
||
else:
|
||
lines.append("**AI Recommendation:** Changes Requested")
|
||
|
||
return "\n".join(lines)
|
||
|
||
def _apply_review_labels(
|
||
self,
|
||
owner: str,
|
||
repo: str,
|
||
pr_number: int,
|
||
review: PRReviewResult,
|
||
) -> list[str]:
|
||
"""Apply labels based on review result."""
|
||
try:
|
||
repo_labels = self.gitea.get_repo_labels(owner, repo)
|
||
label_map = {l["name"]: l["id"] for l in repo_labels}
|
||
except Exception as e:
|
||
self.logger.warning(f"Failed to get repo labels: {e}")
|
||
return []
|
||
|
||
def _handle_review_again(self, context: AgentContext) -> AgentResult:
|
||
"""Re-run PR review on current state."""
|
||
issue = context.event_data.get("issue", {})
|
||
pr_number = issue.get("number")
|
||
comment_author = (
|
||
context.event_data.get("comment", {}).get("user", {}).get("login", "user")
|
||
)
|
||
|
||
self.logger.info(f"Re-reviewing PR #{pr_number} at user request")
|
||
|
||
# Get previous review comment
|
||
previous_comment = self._find_previous_review(
|
||
context.owner, context.repo, pr_number
|
||
)
|
||
previous_findings = []
|
||
if previous_comment:
|
||
previous_findings = self._parse_review_comment(previous_comment)
|
||
|
||
# Run new review (reuse existing review logic)
|
||
actions_taken = []
|
||
|
||
# Step 1: Get PR diff
|
||
diff = self._get_diff(context.owner, context.repo, pr_number)
|
||
if not diff.strip():
|
||
response = f"@{comment_author}\n\n{self.AI_DISCLAIMER}\n\n**🔄 Re-review Requested**\n\nPR has no changes to review."
|
||
self.gitea.create_issue_comment(
|
||
context.owner, context.repo, pr_number, response
|
||
)
|
||
return AgentResult(
|
||
success=True,
|
||
message="PR has no changes to review",
|
||
)
|
||
|
||
# Step 2: Parse changed files
|
||
changed_files = self._parse_diff_files(diff)
|
||
|
||
# Step 3: Run security scan if enabled
|
||
security_issues = []
|
||
agent_config = self.config.get("agents", {}).get("pr", {})
|
||
if agent_config.get("security_scan", True):
|
||
security_issues = self._run_security_scan(changed_files, diff)
|
||
|
||
# Step 4: Run AI review
|
||
review_result = self._run_ai_review(diff, context, security_issues)
|
||
|
||
# Step 5: Compare with previous review
|
||
current_findings = self._extract_findings_from_review(review_result)
|
||
diff_result = self._compare_reviews(previous_findings, current_findings)
|
||
|
||
# Step 6: Generate updated review with comparison
|
||
updated_review = self._format_review_update(
|
||
review_result, diff_result, comment_author
|
||
)
|
||
|
||
# Step 7: Update existing comment (or create new one)
|
||
self.upsert_comment(
|
||
context.owner,
|
||
context.repo,
|
||
pr_number,
|
||
updated_review,
|
||
marker=self.PR_AI_MARKER,
|
||
)
|
||
actions_taken.append("Updated review comment")
|
||
|
||
# Step 8: Update PR labels
|
||
labels_applied = self._apply_review_labels(
|
||
context.owner, context.repo, pr_number, review_result
|
||
)
|
||
if labels_applied:
|
||
actions_taken.append(f"Updated labels: {labels_applied}")
|
||
|
||
return AgentResult(
|
||
success=True,
|
||
message=f"Re-reviewed PR #{pr_number}: {review_result.overall_severity} severity",
|
||
data={
|
||
"severity": review_result.overall_severity,
|
||
"approval": review_result.approval,
|
||
"issues_count": len(review_result.issues),
|
||
"security_issues_count": len(review_result.security_issues),
|
||
"resolved_count": len(diff_result.get("resolved", [])),
|
||
"new_count": len(diff_result.get("new", [])),
|
||
},
|
||
actions_taken=actions_taken,
|
||
)
|
||
|
||
def _find_previous_review(
|
||
self, owner: str, repo: str, pr_number: int
|
||
) -> str | None:
|
||
"""Find the previous AI review comment."""
|
||
comment_id = self.find_ai_comment(
|
||
owner, repo, pr_number, marker=self.PR_AI_MARKER
|
||
)
|
||
if not comment_id:
|
||
return None
|
||
|
||
# Get the comment content
|
||
comments = self.gitea.list_issue_comments(owner, repo, pr_number)
|
||
for comment in comments:
|
||
if comment.get("id") == comment_id:
|
||
return comment.get("body", "")
|
||
|
||
return None
|
||
|
||
def _parse_review_comment(self, comment_text: str) -> list[dict]:
|
||
"""Parse previous review comment to extract findings.
|
||
|
||
Returns:
|
||
List of findings with file, line, severity, description
|
||
"""
|
||
findings = []
|
||
|
||
if not comment_text:
|
||
return findings
|
||
|
||
# Look for patterns like: **[HIGH]** `src/file.py:45` - Description
|
||
pattern = r"\*\*\[(\w+)\]\*\*\s+`([^:]+):(\d+)`\s+-\s+(.+?)(?:\n|$)"
|
||
|
||
for match in re.finditer(pattern, comment_text):
|
||
findings.append(
|
||
{
|
||
"severity": match.group(1),
|
||
"file": match.group(2),
|
||
"line": int(match.group(3)),
|
||
"description": match.group(4).strip(),
|
||
}
|
||
)
|
||
|
||
return findings
|
||
|
||
def _extract_findings_from_review(self, review: PRReviewResult) -> list[dict]:
|
||
"""Extract findings from PRReviewResult into comparable format."""
|
||
findings = []
|
||
all_issues = review.issues + review.security_issues
|
||
|
||
for issue in all_issues:
|
||
findings.append(
|
||
{
|
||
"severity": issue.severity,
|
||
"file": issue.file,
|
||
"line": issue.line or 0,
|
||
"description": issue.description,
|
||
"category": issue.category,
|
||
}
|
||
)
|
||
|
||
return findings
|
||
|
||
def _finding_key(self, finding: dict) -> str:
|
||
"""Create unique key for a finding."""
|
||
file_path = finding.get("file", "unknown")
|
||
line = finding.get("line", 0)
|
||
# Use first 50 chars of description for matching
|
||
desc_key = finding.get("description", "")[:50]
|
||
return f"{file_path}:{line}:{desc_key}"
|
||
|
||
def _compare_reviews(
|
||
self, previous_findings: list[dict], new_findings: list[dict]
|
||
) -> dict:
|
||
"""Compare previous and new review to show what changed.
|
||
|
||
Returns:
|
||
{
|
||
"resolved": [...], # Issues that disappeared
|
||
"new": [...], # New issues found
|
||
"still_present": [...], # Issues that remain
|
||
"severity_changed": {...} # OLD severity -> NEW severity
|
||
}
|
||
"""
|
||
# Create lookup keys (file:line:description)
|
||
prev_keys = {self._finding_key(f): f for f in previous_findings}
|
||
new_keys = {self._finding_key(f): f for f in new_findings}
|
||
|
||
resolved = [prev_keys[key] for key in prev_keys if key not in new_keys]
|
||
|
||
new = [new_keys[key] for key in new_keys if key not in prev_keys]
|
||
|
||
still_present = [new_keys[key] for key in new_keys if key in prev_keys]
|
||
|
||
severity_changed = {}
|
||
for key in prev_keys:
|
||
if key in new_keys:
|
||
prev_severity = prev_keys[key].get("severity")
|
||
new_severity = new_keys[key].get("severity")
|
||
if prev_severity != new_severity:
|
||
severity_changed[key] = {
|
||
"old": prev_severity,
|
||
"new": new_severity,
|
||
"finding": new_keys[key],
|
||
}
|
||
|
||
return {
|
||
"resolved": resolved,
|
||
"new": new,
|
||
"still_present": still_present,
|
||
"severity_changed": severity_changed,
|
||
}
|
||
|
||
def _format_review_update(
|
||
self, review: PRReviewResult, diff: dict, comment_author: str
|
||
) -> str:
|
||
"""Format review with comparison to previous run."""
|
||
lines = [f"@{comment_author}\n"]
|
||
lines.append(f"{self.AI_DISCLAIMER}\n")
|
||
lines.append("**🔄 Re-review Requested**\n")
|
||
lines.append("---\n")
|
||
lines.append("## AI Code Review (Updated)\n")
|
||
|
||
# Summary of changes
|
||
prev_total = len(diff["resolved"]) + len(diff["still_present"])
|
||
curr_total = len(diff["new"]) + len(diff["still_present"])
|
||
|
||
if prev_total > 0:
|
||
lines.append(f"**Previous Review:** {prev_total} issues")
|
||
lines.append(f"**Current Review:** {curr_total} issues\n")
|
||
else:
|
||
lines.append("**First Review** - No previous review found\n")
|
||
|
||
# Changes section (only if there was a previous review)
|
||
if prev_total > 0:
|
||
lines.append("### Changes from Previous Review\n")
|
||
|
||
if diff["resolved"]:
|
||
lines.append(f"**✅ Resolved ({len(diff['resolved'])}):**")
|
||
for finding in diff["resolved"][:5]: # Show max 5
|
||
lines.append(
|
||
f"- **[{finding['severity']}]** `{finding['file']}:{finding['line']}` - {finding['description']}"
|
||
)
|
||
if len(diff["resolved"]) > 5:
|
||
lines.append(f"- ... and {len(diff['resolved']) - 5} more")
|
||
lines.append("")
|
||
|
||
if diff["new"]:
|
||
lines.append(f"**⚠️ New Issues ({len(diff['new'])}):**")
|
||
for finding in diff["new"][:5]:
|
||
lines.append(
|
||
f"- **[{finding['severity']}]** `{finding['file']}:{finding['line']}` - {finding['description']}"
|
||
)
|
||
if len(diff["new"]) > 5:
|
||
lines.append(f"- ... and {len(diff['new']) - 5} more")
|
||
lines.append("")
|
||
|
||
if diff["severity_changed"]:
|
||
lines.append(
|
||
f"**🔄 Severity Changed ({len(diff['severity_changed'])}):**"
|
||
)
|
||
for key, change in list(diff["severity_changed"].items())[:5]:
|
||
finding = change["finding"]
|
||
lines.append(
|
||
f"- `{finding['file']}:{finding['line']}` - {change['old']} → {change['new']}"
|
||
)
|
||
lines.append("")
|
||
|
||
# Summary table
|
||
all_issues = review.issues + review.security_issues
|
||
high = sum(1 for i in all_issues if i.severity == "HIGH")
|
||
medium = sum(1 for i in all_issues if i.severity == "MEDIUM")
|
||
low = sum(1 for i in all_issues if i.severity == "LOW")
|
||
|
||
lines.append("### Summary\n")
|
||
lines.append("| Severity | Count |")
|
||
lines.append("|----------|-------|")
|
||
lines.append(f"| HIGH | {high} |")
|
||
lines.append(f"| MEDIUM | {medium} |")
|
||
lines.append(f"| LOW | {low} |")
|
||
lines.append("")
|
||
|
||
# Security issues section (if any)
|
||
if review.security_issues:
|
||
lines.append("### Security Issues\n")
|
||
for issue in review.security_issues[:5]:
|
||
loc = (
|
||
f"`{issue.file}:{issue.line}`" if issue.line else f"`{issue.file}`"
|
||
)
|
||
lines.append(f"- **[{issue.severity}]** {loc} - {issue.description}")
|
||
if len(review.security_issues) > 5:
|
||
lines.append(f"- ... and {len(review.security_issues) - 5} more")
|
||
lines.append("")
|
||
|
||
# Other issues (limit display)
|
||
other_issues = [i for i in review.issues if i not in review.security_issues]
|
||
if other_issues:
|
||
lines.append("### Review Findings\n")
|
||
for issue in other_issues[:10]:
|
||
loc = (
|
||
f"`{issue.file}:{issue.line}`" if issue.line else f"`{issue.file}`"
|
||
)
|
||
lines.append(f"- **[{issue.severity}]** {loc} - {issue.description}")
|
||
if len(other_issues) > 10:
|
||
lines.append(f"- ... and {len(other_issues) - 10} more issues")
|
||
lines.append("")
|
||
|
||
# Verdict
|
||
lines.append("---")
|
||
lines.append(f"**Overall Severity:** `{review.overall_severity}`")
|
||
if review.approval:
|
||
lines.append("**AI Recommendation:** Approved ✅")
|
||
else:
|
||
lines.append("**AI Recommendation:** Changes Requested ⚠️")
|
||
|
||
return "\n".join(lines)
|
||
|
||
labels_to_add = []
|
||
|
||
# Add approval/changes required label
|
||
# Use helper to support both old string and new dict format
|
||
if review.approval:
|
||
label_config = self._get_label_config("status", "ai_approved")
|
||
else:
|
||
label_config = self._get_label_config("status", "ai_changes_required")
|
||
|
||
label_name = label_config.get("name", "")
|
||
|
||
if label_name and label_name in label_map:
|
||
labels_to_add.append(label_map[label_name])
|
||
|
||
if labels_to_add:
|
||
try:
|
||
self.gitea.add_issue_labels(owner, repo, pr_number, labels_to_add)
|
||
return [name for name, id in label_map.items() if id in labels_to_add]
|
||
except Exception as e:
|
||
self.logger.warning(f"Failed to add labels: {e}")
|
||
|
||
return []
|
||
|
||
def _generate_pr_summary(self, owner: str, repo: str, pr_number: int) -> bool:
|
||
"""Generate and post a summary for a PR.
|
||
|
||
Args:
|
||
owner: Repository owner
|
||
repo: Repository name
|
||
pr_number: PR number
|
||
|
||
Returns:
|
||
True if summary was generated successfully, False otherwise
|
||
"""
|
||
try:
|
||
# Get PR diff
|
||
diff = self._get_diff(owner, repo, pr_number)
|
||
if not diff.strip():
|
||
self.logger.info(f"No diff to summarize for PR #{pr_number}")
|
||
return False
|
||
|
||
# Load summary prompt
|
||
prompt_template = self.load_prompt("pr_summary")
|
||
prompt = f"{prompt_template}\n{diff}"
|
||
|
||
# Call LLM to generate summary
|
||
result = self.call_llm_json(prompt)
|
||
|
||
# Format the summary comment
|
||
summary_comment = self._format_pr_summary(result)
|
||
|
||
# Post as first comment (or update PR description based on config)
|
||
agent_config = self.config.get("agents", {}).get("pr", {})
|
||
auto_summary_config = agent_config.get("auto_summary", {})
|
||
post_as_comment = auto_summary_config.get("post_as_comment", True)
|
||
|
||
if post_as_comment:
|
||
# Post as comment
|
||
self.gitea.create_issue_comment(owner, repo, pr_number, summary_comment)
|
||
self.logger.info(f"Posted PR summary as comment for PR #{pr_number}")
|
||
else:
|
||
# Update PR description (requires different API call)
|
||
# Note: Gitea API may not support updating PR description
|
||
# In that case, fall back to posting as comment
|
||
try:
|
||
self.gitea.update_pull_request(
|
||
owner, repo, pr_number, body=summary_comment
|
||
)
|
||
self.logger.info(
|
||
f"Updated PR description with summary for PR #{pr_number}"
|
||
)
|
||
except Exception as e:
|
||
self.logger.warning(
|
||
f"Could not update PR description, posting as comment: {e}"
|
||
)
|
||
self.gitea.create_issue_comment(
|
||
owner, repo, pr_number, summary_comment
|
||
)
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Failed to generate PR summary: {e}")
|
||
return False
|
||
|
||
def _format_pr_summary(self, summary_data: dict) -> str:
|
||
"""Format the PR summary data into a readable comment.
|
||
|
||
Args:
|
||
summary_data: JSON data from LLM containing summary information
|
||
|
||
Returns:
|
||
Formatted markdown comment
|
||
"""
|
||
lines = [
|
||
self.AI_DISCLAIMER,
|
||
"",
|
||
"## 📋 Pull Request Summary",
|
||
"",
|
||
summary_data.get("summary", "Summary unavailable"),
|
||
"",
|
||
]
|
||
|
||
# Change type
|
||
change_type = summary_data.get("change_type", "Unknown")
|
||
change_type_emoji = {
|
||
"Feature": "✨",
|
||
"Bugfix": "🐛",
|
||
"Refactor": "♻️",
|
||
"Documentation": "📚",
|
||
"Testing": "🧪",
|
||
"Mixed": "🔀",
|
||
}
|
||
emoji = change_type_emoji.get(change_type, "🔀")
|
||
lines.append(f"**Type:** {emoji} {change_type}")
|
||
lines.append("")
|
||
|
||
# Key changes
|
||
key_changes = summary_data.get("key_changes", {})
|
||
if key_changes:
|
||
lines.append("## Changes")
|
||
lines.append("")
|
||
|
||
added = key_changes.get("added", [])
|
||
if added:
|
||
lines.append("**✅ Added:**")
|
||
for item in added:
|
||
lines.append(f"- {item}")
|
||
lines.append("")
|
||
|
||
modified = key_changes.get("modified", [])
|
||
if modified:
|
||
lines.append("**📝 Modified:**")
|
||
for item in modified:
|
||
lines.append(f"- {item}")
|
||
lines.append("")
|
||
|
||
removed = key_changes.get("removed", [])
|
||
if removed:
|
||
lines.append("**❌ Removed:**")
|
||
for item in removed:
|
||
lines.append(f"- {item}")
|
||
lines.append("")
|
||
|
||
# Files affected
|
||
files = summary_data.get("files_affected", [])
|
||
if files:
|
||
lines.append("## Files Affected")
|
||
lines.append("")
|
||
for file_info in files[:10]: # Limit to first 10 files
|
||
path = file_info.get("path", "unknown")
|
||
desc = file_info.get("description", "")
|
||
change_type = file_info.get("change_type", "modified")
|
||
|
||
type_icon = {"added": "➕", "modified": "📝", "deleted": "➖"}
|
||
icon = type_icon.get(change_type, "📝")
|
||
|
||
lines.append(f"- {icon} `{path}` - {desc}")
|
||
|
||
if len(files) > 10:
|
||
lines.append(f"- ... and {len(files) - 10} more files")
|
||
lines.append("")
|
||
|
||
# Impact assessment
|
||
impact = summary_data.get("impact", {})
|
||
if impact:
|
||
scope = impact.get("scope", "unknown")
|
||
description = impact.get("description", "")
|
||
|
||
scope_emoji = {"small": "🟢", "medium": "🟡", "large": "🔴"}
|
||
emoji = scope_emoji.get(scope, "⚪")
|
||
|
||
lines.append("## Impact")
|
||
lines.append(f"{emoji} **Scope:** {scope.capitalize()}")
|
||
lines.append(f"{description}")
|
||
|
||
return "\n".join(lines)
|
||
|
||
def _handle_summarize_command(self, context: AgentContext) -> AgentResult:
|
||
"""Handle @codebot summarize command from PR comments.
|
||
|
||
Args:
|
||
context: Agent context with event data
|
||
|
||
Returns:
|
||
AgentResult with success status and actions taken
|
||
"""
|
||
issue = context.event_data.get("issue", {})
|
||
pr_number = issue.get("number")
|
||
comment_author = (
|
||
context.event_data.get("comment", {}).get("user", {}).get("login", "user")
|
||
)
|
||
|
||
self.logger.info(f"Generating PR summary for PR #{pr_number} at user request")
|
||
|
||
# Generate and post summary
|
||
success = self._generate_pr_summary(context.owner, context.repo, pr_number)
|
||
|
||
if success:
|
||
return AgentResult(
|
||
success=True,
|
||
message=f"Generated PR summary for PR #{pr_number}",
|
||
actions_taken=["Posted PR summary comment"],
|
||
)
|
||
else:
|
||
# Post error message
|
||
error_msg = (
|
||
f"@{comment_author}\n\n"
|
||
f"{self.AI_DISCLAIMER}\n\n"
|
||
"**⚠️ Summary Generation Failed**\n\n"
|
||
"I was unable to generate a summary for this PR. "
|
||
"This could be because:\n"
|
||
"- The PR has no changes\n"
|
||
"- There was an error accessing the diff\n"
|
||
"- The LLM service is unavailable"
|
||
)
|
||
self.gitea.create_issue_comment(
|
||
context.owner, context.repo, pr_number, error_msg
|
||
)
|
||
|
||
return AgentResult(
|
||
success=False,
|
||
message=f"Failed to generate PR summary for PR #{pr_number}",
|
||
error="Summary generation failed",
|
||
)
|