"""Pull Request Review Agent Enhanced AI agent for comprehensive PR reviews with inline comments, security scanning, and automatic label management. """ import re from dataclasses import dataclass, field from agents.base_agent import AgentContext, AgentResult, BaseAgent @dataclass class ReviewIssue: """A single issue found in the PR.""" file: str line: int | None severity: str # HIGH, MEDIUM, LOW category: str # Security, Correctness, Performance, etc. description: str recommendation: str code_snippet: str | None = None @dataclass class PRReviewResult: """Result of a PR review.""" summary: str issues: list[ReviewIssue] overall_severity: str approval: bool security_issues: list[ReviewIssue] = field(default_factory=list) class PRAgent(BaseAgent): """Agent for handling pull request reviews.""" # Marker specific to PR reviews PR_AI_MARKER = "" PR_SUMMARY_MARKER = "" def _get_label_config(self, category: str, key: str) -> dict: """Get full label configuration from config. Args: category: Label category (type, priority, status) key: Label key within category (bug, high, ai_approved, etc.) Returns: Dict with name, color, description, aliases """ labels_config = self.config.get("labels", {}) category_config = labels_config.get(category, {}) label_config = category_config.get(key, {}) # Handle old string format if isinstance(label_config, str): return { "name": label_config, "color": "1d76db", # Default blue "description": "", "aliases": [], } # Handle new dict format return { "name": label_config.get("name", ""), "color": label_config.get("color", "1d76db"), "description": label_config.get("description", ""), "aliases": label_config.get("aliases", []), } def can_handle(self, event_type: str, event_data: dict) -> bool: """Check if this agent handles the given event.""" # Check if agent is enabled agent_config = self.config.get("agents", {}).get("pr", {}) if not agent_config.get("enabled", True): return False if event_type == "pull_request": action = event_data.get("action", "") allowed_events = agent_config.get("events", ["opened", "synchronize"]) return action in allowed_events # Handle issue comments on PRs (for review-again and summarize commands) if event_type == "issue_comment": action = event_data.get("action", "") if action == "created": comment_body = event_data.get("comment", {}).get("body", "") mention_prefix = self.config.get("interaction", {}).get( "mention_prefix", "@codebot" ) # Only handle if this is a PR issue = event_data.get("issue", {}) is_pr = issue.get("pull_request") is not None has_review_again = ( f"{mention_prefix} review-again" in comment_body.lower() ) has_summarize = f"{mention_prefix} summarize" in comment_body.lower() has_changelog = f"{mention_prefix} changelog" in comment_body.lower() return is_pr and (has_review_again or has_summarize or has_changelog) return False def execute(self, context: AgentContext) -> AgentResult: """Execute the PR review agent.""" # Check if this is a comment-based command if context.event_type == "issue_comment": comment_body = context.event_data.get("comment", {}).get("body", "") mention_prefix = self.config.get("interaction", {}).get( "mention_prefix", "@codebot" ) if f"{mention_prefix} summarize" in comment_body.lower(): return self._handle_summarize_command(context) elif f"{mention_prefix} changelog" in comment_body.lower(): return self._handle_changelog_command(context) elif f"{mention_prefix} review-again" in comment_body.lower(): return self._handle_review_again(context) pr = context.event_data.get("pull_request", {}) pr_number = pr.get("number") self.logger.info(f"Reviewing PR #{pr_number}: {pr.get('title')}") actions_taken = [] # Check if PR has empty description and auto-summary is enabled pr_body = pr.get("body", "").strip() agent_config = self.config.get("agents", {}).get("pr", {}) auto_summary_enabled = agent_config.get("auto_summary", {}).get("enabled", True) if ( not pr_body and auto_summary_enabled and context.event_data.get("action") == "opened" ): # Generate and post summary for empty PR descriptions summary_result = self._generate_pr_summary( context.owner, context.repo, pr_number ) if summary_result: actions_taken.append("Generated PR summary for empty description") # Don't return here - continue with regular review # Step 1: Get PR diff diff = self._get_diff(context.owner, context.repo, pr_number) if not diff.strip(): return AgentResult( success=True, message="PR has no changes to review", ) # Step 2: Parse changed files changed_files = self._parse_diff_files(diff) # Step 3: Run security scan if enabled security_issues = [] agent_config = self.config.get("agents", {}).get("pr", {}) if agent_config.get("security_scan", True): security_issues = self._run_security_scan(changed_files, diff) if security_issues: actions_taken.append(f"Found {len(security_issues)} security issues") # Step 4: Run AI review review_result = self._run_ai_review(diff, context, security_issues) # Step 5: Post inline comments if enabled if agent_config.get("inline_comments", True) and review_result.issues: inline_count = self._post_inline_comments( context.owner, context.repo, pr_number, review_result ) actions_taken.append(f"Posted {inline_count} inline comments") # Step 6: Post summary comment summary_comment = self._generate_summary_comment(review_result) self.upsert_comment( context.owner, context.repo, pr_number, summary_comment, marker=self.PR_AI_MARKER, ) actions_taken.append("Posted summary comment") # Step 7: Apply labels labels_applied = self._apply_review_labels( context.owner, context.repo, pr_number, review_result ) if labels_applied: actions_taken.append(f"Applied labels: {labels_applied}") return AgentResult( success=True, message=f"Reviewed PR #{pr_number}: {review_result.overall_severity} severity", data={ "severity": review_result.overall_severity, "approval": review_result.approval, "issues_count": len(review_result.issues), "security_issues_count": len(review_result.security_issues), }, actions_taken=actions_taken, ) def _get_diff(self, owner: str, repo: str, pr_number: int) -> str: """Get the PR diff, truncated if necessary.""" max_lines = self.config.get("review", {}).get("max_diff_lines", 800) try: diff = self.gitea.get_pull_request_diff(owner, repo, pr_number) lines = diff.splitlines() if len(lines) > max_lines: return "\n".join(lines[:max_lines]) return diff except Exception as e: self.logger.error(f"Failed to get diff: {e}") return "" def _parse_diff_files(self, diff: str) -> dict[str, str]: """Parse diff into file -> content mapping.""" files = {} current_file = None current_content = [] for line in diff.splitlines(): if line.startswith("diff --git"): if current_file: files[current_file] = "\n".join(current_content) # Extract file path from "diff --git a/path b/path" match = re.search(r"b/(.+)$", line) if match: current_file = match.group(1) current_content = [] elif current_file: current_content.append(line) if current_file: files[current_file] = "\n".join(current_content) return files def _run_security_scan( self, changed_files: dict[str, str], diff: str ) -> list[ReviewIssue]: """Run security pattern scanning on the diff.""" issues = [] # Security patterns to detect patterns = [ { "name": "Hardcoded Secrets", "pattern": r'(?i)(api_key|apikey|secret|password|token|auth)\s*[=:]\s*["\'][^"\']{8,}["\']', "severity": "HIGH", "category": "Security", "description": "Potential hardcoded secret or API key detected", "recommendation": "Move secrets to environment variables or a secrets manager", }, { "name": "SQL Injection", "pattern": r'(?i)(execute|query)\s*\([^)]*\+[^)]*\)|f["\'].*\{.*\}.*(?:SELECT|INSERT|UPDATE|DELETE)', "severity": "HIGH", "category": "Security", "description": "Potential SQL injection vulnerability - string concatenation in query", "recommendation": "Use parameterized queries or prepared statements", }, { "name": "Hardcoded IP", "pattern": r"\b(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b", "severity": "LOW", "category": "Security", "description": "Hardcoded IP address detected", "recommendation": "Consider using configuration or DNS names instead", }, { "name": "Eval Usage", "pattern": r"\beval\s*\(", "severity": "HIGH", "category": "Security", "description": "Use of eval() detected - potential code injection risk", "recommendation": "Avoid eval() - use safer alternatives like ast.literal_eval() for Python", }, { "name": "Shell Injection", "pattern": r"(?i)(?:subprocess\.call|os\.system|shell\s*=\s*True)", "severity": "MEDIUM", "category": "Security", "description": "Potential shell command execution - verify input is sanitized", "recommendation": "Use subprocess with shell=False and pass arguments as a list", }, ] for filename, content in changed_files.items(): # Only check added lines (starting with +) added_lines = [] line_numbers = [] current_line = 0 for line in content.splitlines(): if line.startswith("@@"): # Parse line number from @@ -x,y +a,b @@ match = re.search(r"\+(\d+)", line) if match: current_line = int(match.group(1)) - 1 elif line.startswith("+") and not line.startswith("+++"): current_line += 1 added_lines.append((current_line, line[1:])) elif not line.startswith("-"): current_line += 1 # Check patterns on added lines for line_num, line_content in added_lines: for pattern_def in patterns: if re.search(pattern_def["pattern"], line_content): issues.append( ReviewIssue( file=filename, line=line_num, severity=pattern_def["severity"], category=pattern_def["category"], description=pattern_def["description"], recommendation=pattern_def["recommendation"], code_snippet=line_content.strip()[:100], ) ) return issues def _run_ai_review( self, diff: str, context: AgentContext, security_issues: list[ReviewIssue], ) -> PRReviewResult: """Run AI-based code review.""" prompt_template = self.load_prompt("base") # Add security context if issues were found security_context = "" if security_issues: security_context = "\n\nSECURITY SCAN RESULTS (already detected):\n" for issue in security_issues[:5]: # Limit to first 5 security_context += f"- [{issue.severity}] {issue.file}:{issue.line} - {issue.description}\n" prompt = f"{prompt_template}\n{security_context}\nDIFF:\n{diff}" try: result = self.call_llm_json(prompt) issues = [] for issue_data in result.get("issues", []): issues.append( ReviewIssue( file=issue_data.get("file", "unknown"), line=issue_data.get("line"), severity=issue_data.get("severity", "MEDIUM"), category=issue_data.get("category", "General"), description=issue_data.get("description", ""), recommendation=issue_data.get("recommendation", ""), code_snippet=issue_data.get("code_snippet"), ) ) return PRReviewResult( summary=result.get("summary", "Review completed"), issues=issues, overall_severity=result.get("overall_severity", "LOW"), approval=result.get("approval", True), security_issues=security_issues, ) except Exception as e: self.logger.error(f"AI review failed: {e}") return PRReviewResult( summary=f"AI review encountered an error: {e}", issues=[], overall_severity="UNKNOWN", approval=False, security_issues=security_issues, ) def _post_inline_comments( self, owner: str, repo: str, pr_number: int, review: PRReviewResult, ) -> int: """Post inline comments for issues with line numbers.""" comments = [] all_issues = review.issues + review.security_issues for issue in all_issues: if issue.line and issue.file: comment_body = ( f"**[{issue.severity}] {issue.category}**\n\n" f"{issue.description}\n\n" f"**Recommendation:** {issue.recommendation}" ) comments.append( { "path": issue.file, "line": issue.line, "body": comment_body, } ) if not comments: return 0 try: # Use Gitea's pull request review API for inline comments self.gitea.create_pull_request_review( owner=owner, repo=repo, index=pr_number, body="AI Code Review - Inline Comments", event="COMMENT", comments=comments[:10], # Limit to 10 inline comments ) return min(len(comments), 10) except Exception as e: self.logger.warning(f"Failed to post inline comments: {e}") return 0 def _generate_summary_comment(self, review: PRReviewResult) -> str: """Generate the summary comment for the PR.""" lines = [ f"{self.AI_DISCLAIMER}", "", "## AI Code Review", "", review.summary, "", ] # Statistics all_issues = review.issues + review.security_issues high = sum(1 for i in all_issues if i.severity == "HIGH") medium = sum(1 for i in all_issues if i.severity == "MEDIUM") low = sum(1 for i in all_issues if i.severity == "LOW") lines.append("### Summary") lines.append("") lines.append(f"| Severity | Count |") lines.append(f"|----------|-------|") lines.append(f"| HIGH | {high} |") lines.append(f"| MEDIUM | {medium} |") lines.append(f"| LOW | {low} |") lines.append("") # Security issues section if review.security_issues: lines.append("### Security Issues") lines.append("") for issue in review.security_issues[:5]: lines.append( f"- **[{issue.severity}]** `{issue.file}:{issue.line}` - {issue.description}" ) lines.append("") # Other issues (limit display) other_issues = [i for i in review.issues if i not in review.security_issues] if other_issues: lines.append("### Review Findings") lines.append("") for issue in other_issues[:10]: loc = ( f"`{issue.file}:{issue.line}`" if issue.line else f"`{issue.file}`" ) lines.append(f"- **[{issue.severity}]** {loc} - {issue.description}") if len(other_issues) > 10: lines.append(f"- ...and {len(other_issues) - 10} more issues") lines.append("") # Verdict lines.append("---") lines.append(f"**Overall Severity:** `{review.overall_severity}`") if review.approval: lines.append("**AI Recommendation:** Approve") else: lines.append("**AI Recommendation:** Changes Requested") return "\n".join(lines) def _apply_review_labels( self, owner: str, repo: str, pr_number: int, review: PRReviewResult, ) -> list[str]: """Apply labels based on review result.""" try: repo_labels = self.gitea.get_repo_labels(owner, repo) label_map = {l["name"]: l["id"] for l in repo_labels} except Exception as e: self.logger.warning(f"Failed to get repo labels: {e}") return [] def _handle_review_again(self, context: AgentContext) -> AgentResult: """Re-run PR review on current state.""" issue = context.event_data.get("issue", {}) pr_number = issue.get("number") comment_author = ( context.event_data.get("comment", {}).get("user", {}).get("login", "user") ) self.logger.info(f"Re-reviewing PR #{pr_number} at user request") # Get previous review comment previous_comment = self._find_previous_review( context.owner, context.repo, pr_number ) previous_findings = [] if previous_comment: previous_findings = self._parse_review_comment(previous_comment) # Run new review (reuse existing review logic) actions_taken = [] # Step 1: Get PR diff diff = self._get_diff(context.owner, context.repo, pr_number) if not diff.strip(): response = f"@{comment_author}\n\n{self.AI_DISCLAIMER}\n\n**๐Ÿ”„ Re-review Requested**\n\nPR has no changes to review." self.gitea.create_issue_comment( context.owner, context.repo, pr_number, response ) return AgentResult( success=True, message="PR has no changes to review", ) # Step 2: Parse changed files changed_files = self._parse_diff_files(diff) # Step 3: Run security scan if enabled security_issues = [] agent_config = self.config.get("agents", {}).get("pr", {}) if agent_config.get("security_scan", True): security_issues = self._run_security_scan(changed_files, diff) # Step 4: Run AI review review_result = self._run_ai_review(diff, context, security_issues) # Step 5: Compare with previous review current_findings = self._extract_findings_from_review(review_result) diff_result = self._compare_reviews(previous_findings, current_findings) # Step 6: Generate updated review with comparison updated_review = self._format_review_update( review_result, diff_result, comment_author ) # Step 7: Update existing comment (or create new one) self.upsert_comment( context.owner, context.repo, pr_number, updated_review, marker=self.PR_AI_MARKER, ) actions_taken.append("Updated review comment") # Step 8: Update PR labels labels_applied = self._apply_review_labels( context.owner, context.repo, pr_number, review_result ) if labels_applied: actions_taken.append(f"Updated labels: {labels_applied}") return AgentResult( success=True, message=f"Re-reviewed PR #{pr_number}: {review_result.overall_severity} severity", data={ "severity": review_result.overall_severity, "approval": review_result.approval, "issues_count": len(review_result.issues), "security_issues_count": len(review_result.security_issues), "resolved_count": len(diff_result.get("resolved", [])), "new_count": len(diff_result.get("new", [])), }, actions_taken=actions_taken, ) def _find_previous_review( self, owner: str, repo: str, pr_number: int ) -> str | None: """Find the previous AI review comment.""" comment_id = self.find_ai_comment( owner, repo, pr_number, marker=self.PR_AI_MARKER ) if not comment_id: return None # Get the comment content comments = self.gitea.list_issue_comments(owner, repo, pr_number) for comment in comments: if comment.get("id") == comment_id: return comment.get("body", "") return None def _parse_review_comment(self, comment_text: str) -> list[dict]: """Parse previous review comment to extract findings. Returns: List of findings with file, line, severity, description """ findings = [] if not comment_text: return findings # Look for patterns like: **[HIGH]** `src/file.py:45` - Description pattern = r"\*\*\[(\w+)\]\*\*\s+`([^:]+):(\d+)`\s+-\s+(.+?)(?:\n|$)" for match in re.finditer(pattern, comment_text): findings.append( { "severity": match.group(1), "file": match.group(2), "line": int(match.group(3)), "description": match.group(4).strip(), } ) return findings def _extract_findings_from_review(self, review: PRReviewResult) -> list[dict]: """Extract findings from PRReviewResult into comparable format.""" findings = [] all_issues = review.issues + review.security_issues for issue in all_issues: findings.append( { "severity": issue.severity, "file": issue.file, "line": issue.line or 0, "description": issue.description, "category": issue.category, } ) return findings def _finding_key(self, finding: dict) -> str: """Create unique key for a finding.""" file_path = finding.get("file", "unknown") line = finding.get("line", 0) # Use first 50 chars of description for matching desc_key = finding.get("description", "")[:50] return f"{file_path}:{line}:{desc_key}" def _compare_reviews( self, previous_findings: list[dict], new_findings: list[dict] ) -> dict: """Compare previous and new review to show what changed. Returns: { "resolved": [...], # Issues that disappeared "new": [...], # New issues found "still_present": [...], # Issues that remain "severity_changed": {...} # OLD severity -> NEW severity } """ # Create lookup keys (file:line:description) prev_keys = {self._finding_key(f): f for f in previous_findings} new_keys = {self._finding_key(f): f for f in new_findings} resolved = [prev_keys[key] for key in prev_keys if key not in new_keys] new = [new_keys[key] for key in new_keys if key not in prev_keys] still_present = [new_keys[key] for key in new_keys if key in prev_keys] severity_changed = {} for key in prev_keys: if key in new_keys: prev_severity = prev_keys[key].get("severity") new_severity = new_keys[key].get("severity") if prev_severity != new_severity: severity_changed[key] = { "old": prev_severity, "new": new_severity, "finding": new_keys[key], } return { "resolved": resolved, "new": new, "still_present": still_present, "severity_changed": severity_changed, } def _format_review_update( self, review: PRReviewResult, diff: dict, comment_author: str ) -> str: """Format review with comparison to previous run.""" lines = [f"@{comment_author}\n"] lines.append(f"{self.AI_DISCLAIMER}\n") lines.append("**๐Ÿ”„ Re-review Requested**\n") lines.append("---\n") lines.append("## AI Code Review (Updated)\n") # Summary of changes prev_total = len(diff["resolved"]) + len(diff["still_present"]) curr_total = len(diff["new"]) + len(diff["still_present"]) if prev_total > 0: lines.append(f"**Previous Review:** {prev_total} issues") lines.append(f"**Current Review:** {curr_total} issues\n") else: lines.append("**First Review** - No previous review found\n") # Changes section (only if there was a previous review) if prev_total > 0: lines.append("### Changes from Previous Review\n") if diff["resolved"]: lines.append(f"**โœ… Resolved ({len(diff['resolved'])}):**") for finding in diff["resolved"][:5]: # Show max 5 lines.append( f"- **[{finding['severity']}]** `{finding['file']}:{finding['line']}` - {finding['description']}" ) if len(diff["resolved"]) > 5: lines.append(f"- ... and {len(diff['resolved']) - 5} more") lines.append("") if diff["new"]: lines.append(f"**โš ๏ธ New Issues ({len(diff['new'])}):**") for finding in diff["new"][:5]: lines.append( f"- **[{finding['severity']}]** `{finding['file']}:{finding['line']}` - {finding['description']}" ) if len(diff["new"]) > 5: lines.append(f"- ... and {len(diff['new']) - 5} more") lines.append("") if diff["severity_changed"]: lines.append( f"**๐Ÿ”„ Severity Changed ({len(diff['severity_changed'])}):**" ) for key, change in list(diff["severity_changed"].items())[:5]: finding = change["finding"] lines.append( f"- `{finding['file']}:{finding['line']}` - {change['old']} โ†’ {change['new']}" ) lines.append("") # Summary table all_issues = review.issues + review.security_issues high = sum(1 for i in all_issues if i.severity == "HIGH") medium = sum(1 for i in all_issues if i.severity == "MEDIUM") low = sum(1 for i in all_issues if i.severity == "LOW") lines.append("### Summary\n") lines.append("| Severity | Count |") lines.append("|----------|-------|") lines.append(f"| HIGH | {high} |") lines.append(f"| MEDIUM | {medium} |") lines.append(f"| LOW | {low} |") lines.append("") # Security issues section (if any) if review.security_issues: lines.append("### Security Issues\n") for issue in review.security_issues[:5]: loc = ( f"`{issue.file}:{issue.line}`" if issue.line else f"`{issue.file}`" ) lines.append(f"- **[{issue.severity}]** {loc} - {issue.description}") if len(review.security_issues) > 5: lines.append(f"- ... and {len(review.security_issues) - 5} more") lines.append("") # Other issues (limit display) other_issues = [i for i in review.issues if i not in review.security_issues] if other_issues: lines.append("### Review Findings\n") for issue in other_issues[:10]: loc = ( f"`{issue.file}:{issue.line}`" if issue.line else f"`{issue.file}`" ) lines.append(f"- **[{issue.severity}]** {loc} - {issue.description}") if len(other_issues) > 10: lines.append(f"- ... and {len(other_issues) - 10} more issues") lines.append("") # Verdict lines.append("---") lines.append(f"**Overall Severity:** `{review.overall_severity}`") if review.approval: lines.append("**AI Recommendation:** Approved โœ…") else: lines.append("**AI Recommendation:** Changes Requested โš ๏ธ") return "\n".join(lines) labels_to_add = [] # Add approval/changes required label # Use helper to support both old string and new dict format if review.approval: label_config = self._get_label_config("status", "ai_approved") else: label_config = self._get_label_config("status", "ai_changes_required") label_name = label_config.get("name", "") if label_name and label_name in label_map: labels_to_add.append(label_map[label_name]) if labels_to_add: try: self.gitea.add_issue_labels(owner, repo, pr_number, labels_to_add) return [name for name, id in label_map.items() if id in labels_to_add] except Exception as e: self.logger.warning(f"Failed to add labels: {e}") return [] def _generate_pr_summary(self, owner: str, repo: str, pr_number: int) -> bool: """Generate and post a summary for a PR. Args: owner: Repository owner repo: Repository name pr_number: PR number Returns: True if summary was generated successfully, False otherwise """ try: # Get PR diff diff = self._get_diff(owner, repo, pr_number) if not diff.strip(): self.logger.info(f"No diff to summarize for PR #{pr_number}") return False # Load summary prompt prompt_template = self.load_prompt("pr_summary") prompt = f"{prompt_template}\n{diff}" # Call LLM to generate summary result = self.call_llm_json(prompt) # Format the summary comment summary_comment = self._format_pr_summary(result) # Post as first comment (or update PR description based on config) agent_config = self.config.get("agents", {}).get("pr", {}) auto_summary_config = agent_config.get("auto_summary", {}) post_as_comment = auto_summary_config.get("post_as_comment", True) if post_as_comment: # Post as comment self.gitea.create_issue_comment(owner, repo, pr_number, summary_comment) self.logger.info(f"Posted PR summary as comment for PR #{pr_number}") else: # Update PR description (requires different API call) # Note: Gitea API may not support updating PR description # In that case, fall back to posting as comment try: self.gitea.update_pull_request( owner, repo, pr_number, body=summary_comment ) self.logger.info( f"Updated PR description with summary for PR #{pr_number}" ) except Exception as e: self.logger.warning( f"Could not update PR description, posting as comment: {e}" ) self.gitea.create_issue_comment( owner, repo, pr_number, summary_comment ) return True except Exception as e: self.logger.error(f"Failed to generate PR summary: {e}") return False def _format_pr_summary(self, summary_data: dict) -> str: """Format the PR summary data into a readable comment. Args: summary_data: JSON data from LLM containing summary information Returns: Formatted markdown comment """ lines = [ self.AI_DISCLAIMER, "", "## ๐Ÿ“‹ Pull Request Summary", "", summary_data.get("summary", "Summary unavailable"), "", ] # Change type change_type = summary_data.get("change_type", "Unknown") change_type_emoji = { "Feature": "โœจ", "Bugfix": "๐Ÿ›", "Refactor": "โ™ป๏ธ", "Documentation": "๐Ÿ“š", "Testing": "๐Ÿงช", "Mixed": "๐Ÿ”€", } emoji = change_type_emoji.get(change_type, "๐Ÿ”€") lines.append(f"**Type:** {emoji} {change_type}") lines.append("") # Key changes key_changes = summary_data.get("key_changes", {}) if key_changes: lines.append("## Changes") lines.append("") added = key_changes.get("added", []) if added: lines.append("**โœ… Added:**") for item in added: lines.append(f"- {item}") lines.append("") modified = key_changes.get("modified", []) if modified: lines.append("**๐Ÿ“ Modified:**") for item in modified: lines.append(f"- {item}") lines.append("") removed = key_changes.get("removed", []) if removed: lines.append("**โŒ Removed:**") for item in removed: lines.append(f"- {item}") lines.append("") # Files affected files = summary_data.get("files_affected", []) if files: lines.append("## Files Affected") lines.append("") for file_info in files[:10]: # Limit to first 10 files path = file_info.get("path", "unknown") desc = file_info.get("description", "") change_type = file_info.get("change_type", "modified") type_icon = {"added": "โž•", "modified": "๐Ÿ“", "deleted": "โž–"} icon = type_icon.get(change_type, "๐Ÿ“") lines.append(f"- {icon} `{path}` - {desc}") if len(files) > 10: lines.append(f"- ... and {len(files) - 10} more files") lines.append("") # Impact assessment impact = summary_data.get("impact", {}) if impact: scope = impact.get("scope", "unknown") description = impact.get("description", "") scope_emoji = {"small": "๐ŸŸข", "medium": "๐ŸŸก", "large": "๐Ÿ”ด"} emoji = scope_emoji.get(scope, "โšช") lines.append("## Impact") lines.append(f"{emoji} **Scope:** {scope.capitalize()}") lines.append(f"{description}") return "\n".join(lines) def _handle_summarize_command(self, context: AgentContext) -> AgentResult: """Handle @codebot summarize command from PR comments. Args: context: Agent context with event data Returns: AgentResult with success status and actions taken """ issue = context.event_data.get("issue", {}) pr_number = issue.get("number") comment_author = ( context.event_data.get("comment", {}).get("user", {}).get("login", "user") ) self.logger.info(f"Generating PR summary for PR #{pr_number} at user request") # Generate and post summary success = self._generate_pr_summary(context.owner, context.repo, pr_number) if success: return AgentResult( success=True, message=f"Generated PR summary for PR #{pr_number}", actions_taken=["Posted PR summary comment"], ) else: # Post error message error_msg = ( f"@{comment_author}\n\n" f"{self.AI_DISCLAIMER}\n\n" "**โš ๏ธ Summary Generation Failed**\n\n" "I was unable to generate a summary for this PR. " "This could be because:\n" "- The PR has no changes\n" "- There was an error accessing the diff\n" "- The LLM service is unavailable" ) self.gitea.create_issue_comment( context.owner, context.repo, pr_number, error_msg ) return AgentResult( success=False, message=f"Failed to generate PR summary for PR #{pr_number}", error="Summary generation failed", ) def _handle_changelog_command(self, context: AgentContext) -> AgentResult: """Handle @codebot changelog command from PR comments. Generates Keep a Changelog format entries for the PR. Args: context: Agent context with event data Returns: AgentResult with success status and actions taken """ issue = context.event_data.get("issue", {}) pr_number = issue.get("number") comment_author = ( context.event_data.get("comment", {}).get("user", {}).get("login", "user") ) self.logger.info(f"Generating changelog for PR #{pr_number} at user request") try: # Get PR data pr = self.gitea.get_pull_request(context.owner, context.repo, pr_number) pr_title = pr.get("title", "") pr_description = pr.get("body", "") # Get PR diff diff = self._get_diff(context.owner, context.repo, pr_number) if not diff.strip(): error_msg = ( f"@{comment_author}\n\n" f"{self.AI_DISCLAIMER}\n\n" "**โš ๏ธ Changelog Generation Failed**\n\n" "No changes found in this PR to analyze." ) self.gitea.create_issue_comment( context.owner, context.repo, pr_number, error_msg ) return AgentResult( success=False, message=f"No diff to generate changelog for PR #{pr_number}", ) # Load changelog prompt prompt_template = self.load_prompt("changelog") prompt = prompt_template.format( pr_title=pr_title, pr_description=pr_description or "(No description provided)", ) prompt = f"{prompt}\n{diff}" # Call LLM to generate changelog result = self.call_llm_json(prompt) # Format the changelog comment changelog_comment = self._format_changelog(result, pr_number) # Post changelog comment self.gitea.create_issue_comment( context.owner, context.repo, pr_number, changelog_comment ) return AgentResult( success=True, message=f"Generated changelog for PR #{pr_number}", actions_taken=["Posted changelog comment"], ) except Exception as e: self.logger.error(f"Failed to generate changelog: {e}") # Post error message error_msg = ( f"@{comment_author}\n\n" f"{self.AI_DISCLAIMER}\n\n" "**โš ๏ธ Changelog Generation Failed**\n\n" f"I encountered an error while generating the changelog: {str(e)}\n\n" "This could be due to:\n" "- The PR is too large to analyze\n" "- The LLM service is temporarily unavailable\n" "- An unexpected error occurred" ) self.gitea.create_issue_comment( context.owner, context.repo, pr_number, error_msg ) return AgentResult( success=False, message=f"Failed to generate changelog for PR #{pr_number}", error=str(e), ) def _format_changelog(self, changelog_data: dict, pr_number: int) -> str: """Format changelog data into Keep a Changelog format. Args: changelog_data: JSON data from LLM containing changelog entries pr_number: PR number for reference Returns: Formatted markdown changelog """ lines = [ self.AI_DISCLAIMER, "", f"## ๐Ÿ“‹ Changelog for PR #{pr_number}", "", ] changelog = changelog_data.get("changelog", {}) # Added added = changelog.get("added", []) if added: lines.append("### โœจ Added") for item in added: lines.append(f"- {item}") lines.append("") # Changed changed = changelog.get("changed", []) if changed: lines.append("### ๐Ÿ”„ Changed") for item in changed: lines.append(f"- {item}") lines.append("") # Deprecated deprecated = changelog.get("deprecated", []) if deprecated: lines.append("### โš ๏ธ Deprecated") for item in deprecated: lines.append(f"- {item}") lines.append("") # Removed removed = changelog.get("removed", []) if removed: lines.append("### ๐Ÿ—‘๏ธ Removed") for item in removed: lines.append(f"- {item}") lines.append("") # Fixed fixed = changelog.get("fixed", []) if fixed: lines.append("### ๐Ÿ› Fixed") for item in fixed: lines.append(f"- {item}") lines.append("") # Security security = changelog.get("security", []) if security: lines.append("### ๐Ÿ”’ Security") for item in security: lines.append(f"- {item}") lines.append("") # Breaking changes breaking = changelog_data.get("breaking_changes", []) if breaking: lines.append("---") lines.append("") lines.append("### โš ๏ธ BREAKING CHANGES") for item in breaking: lines.append(f"- **{item}**") lines.append("") # Technical details tech = changelog_data.get("technical_details", {}) if tech: lines.append("---") lines.append("") lines.append("### ๐Ÿ“Š Technical Details") files = tech.get("files_changed", 0) additions = tech.get("insertions", 0) deletions = tech.get("deletions", 0) lines.append(f"- **Files changed:** {files}") lines.append(f"- **Lines:** +{additions} / -{deletions}") components = tech.get("main_components", []) if components: lines.append(f"- **Main components:** {', '.join(components)}") return "\n".join(lines)