"""Pull Request Review Agent Enhanced AI agent for comprehensive PR reviews with inline comments, security scanning, and automatic label management. """ import re from dataclasses import dataclass, field from agents.base_agent import AgentContext, AgentResult, BaseAgent @dataclass class ReviewIssue: """A single issue found in the PR.""" file: str line: int | None severity: str # HIGH, MEDIUM, LOW category: str # Security, Correctness, Performance, etc. description: str recommendation: str code_snippet: str | None = None @dataclass class PRReviewResult: """Result of a PR review.""" summary: str issues: list[ReviewIssue] overall_severity: str approval: bool security_issues: list[ReviewIssue] = field(default_factory=list) class PRAgent(BaseAgent): """Agent for handling pull request reviews.""" # Marker specific to PR reviews PR_AI_MARKER = "" def _get_label_config(self, category: str, key: str) -> dict: """Get full label configuration from config. Args: category: Label category (type, priority, status) key: Label key within category (bug, high, ai_approved, etc.) Returns: Dict with name, color, description, aliases """ labels_config = self.config.get("labels", {}) category_config = labels_config.get(category, {}) label_config = category_config.get(key, {}) # Handle old string format if isinstance(label_config, str): return { "name": label_config, "color": "1d76db", # Default blue "description": "", "aliases": [], } # Handle new dict format return { "name": label_config.get("name", ""), "color": label_config.get("color", "1d76db"), "description": label_config.get("description", ""), "aliases": label_config.get("aliases", []), } def can_handle(self, event_type: str, event_data: dict) -> bool: """Check if this agent handles the given event.""" # Check if agent is enabled agent_config = self.config.get("agents", {}).get("pr", {}) if not agent_config.get("enabled", True): return False if event_type == "pull_request": action = event_data.get("action", "") allowed_events = agent_config.get("events", ["opened", "synchronize"]) return action in allowed_events return False def execute(self, context: AgentContext) -> AgentResult: """Execute the PR review agent.""" pr = context.event_data.get("pull_request", {}) pr_number = pr.get("number") self.logger.info(f"Reviewing PR #{pr_number}: {pr.get('title')}") actions_taken = [] # Step 1: Get PR diff diff = self._get_diff(context.owner, context.repo, pr_number) if not diff.strip(): return AgentResult( success=True, message="PR has no changes to review", ) # Step 2: Parse changed files changed_files = self._parse_diff_files(diff) # Step 3: Run security scan if enabled security_issues = [] agent_config = self.config.get("agents", {}).get("pr", {}) if agent_config.get("security_scan", True): security_issues = self._run_security_scan(changed_files, diff) if security_issues: actions_taken.append(f"Found {len(security_issues)} security issues") # Step 4: Run AI review review_result = self._run_ai_review(diff, context, security_issues) # Step 5: Post inline comments if enabled if agent_config.get("inline_comments", True) and review_result.issues: inline_count = self._post_inline_comments( context.owner, context.repo, pr_number, review_result ) actions_taken.append(f"Posted {inline_count} inline comments") # Step 6: Post summary comment summary_comment = self._generate_summary_comment(review_result) self.upsert_comment( context.owner, context.repo, pr_number, summary_comment, marker=self.PR_AI_MARKER, ) actions_taken.append("Posted summary comment") # Step 7: Apply labels labels_applied = self._apply_review_labels( context.owner, context.repo, pr_number, review_result ) if labels_applied: actions_taken.append(f"Applied labels: {labels_applied}") return AgentResult( success=True, message=f"Reviewed PR #{pr_number}: {review_result.overall_severity} severity", data={ "severity": review_result.overall_severity, "approval": review_result.approval, "issues_count": len(review_result.issues), "security_issues_count": len(review_result.security_issues), }, actions_taken=actions_taken, ) def _get_diff(self, owner: str, repo: str, pr_number: int) -> str: """Get the PR diff, truncated if necessary.""" max_lines = self.config.get("review", {}).get("max_diff_lines", 800) try: diff = self.gitea.get_pull_request_diff(owner, repo, pr_number) lines = diff.splitlines() if len(lines) > max_lines: return "\n".join(lines[:max_lines]) return diff except Exception as e: self.logger.error(f"Failed to get diff: {e}") return "" def _parse_diff_files(self, diff: str) -> dict[str, str]: """Parse diff into file -> content mapping.""" files = {} current_file = None current_content = [] for line in diff.splitlines(): if line.startswith("diff --git"): if current_file: files[current_file] = "\n".join(current_content) # Extract file path from "diff --git a/path b/path" match = re.search(r"b/(.+)$", line) if match: current_file = match.group(1) current_content = [] elif current_file: current_content.append(line) if current_file: files[current_file] = "\n".join(current_content) return files def _run_security_scan( self, changed_files: dict[str, str], diff: str ) -> list[ReviewIssue]: """Run security pattern scanning on the diff.""" issues = [] # Security patterns to detect patterns = [ { "name": "Hardcoded Secrets", "pattern": r'(?i)(api_key|apikey|secret|password|token|auth)\s*[=:]\s*["\'][^"\']{8,}["\']', "severity": "HIGH", "category": "Security", "description": "Potential hardcoded secret or API key detected", "recommendation": "Move secrets to environment variables or a secrets manager", }, { "name": "SQL Injection", "pattern": r'(?i)(execute|query)\s*\([^)]*\+[^)]*\)|f["\'].*\{.*\}.*(?:SELECT|INSERT|UPDATE|DELETE)', "severity": "HIGH", "category": "Security", "description": "Potential SQL injection vulnerability - string concatenation in query", "recommendation": "Use parameterized queries or prepared statements", }, { "name": "Hardcoded IP", "pattern": r"\b(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b", "severity": "LOW", "category": "Security", "description": "Hardcoded IP address detected", "recommendation": "Consider using configuration or DNS names instead", }, { "name": "Eval Usage", "pattern": r"\beval\s*\(", "severity": "HIGH", "category": "Security", "description": "Use of eval() detected - potential code injection risk", "recommendation": "Avoid eval() - use safer alternatives like ast.literal_eval() for Python", }, { "name": "Shell Injection", "pattern": r"(?i)(?:subprocess\.call|os\.system|shell\s*=\s*True)", "severity": "MEDIUM", "category": "Security", "description": "Potential shell command execution - verify input is sanitized", "recommendation": "Use subprocess with shell=False and pass arguments as a list", }, ] for filename, content in changed_files.items(): # Only check added lines (starting with +) added_lines = [] line_numbers = [] current_line = 0 for line in content.splitlines(): if line.startswith("@@"): # Parse line number from @@ -x,y +a,b @@ match = re.search(r"\+(\d+)", line) if match: current_line = int(match.group(1)) - 1 elif line.startswith("+") and not line.startswith("+++"): current_line += 1 added_lines.append((current_line, line[1:])) elif not line.startswith("-"): current_line += 1 # Check patterns on added lines for line_num, line_content in added_lines: for pattern_def in patterns: if re.search(pattern_def["pattern"], line_content): issues.append( ReviewIssue( file=filename, line=line_num, severity=pattern_def["severity"], category=pattern_def["category"], description=pattern_def["description"], recommendation=pattern_def["recommendation"], code_snippet=line_content.strip()[:100], ) ) return issues def _run_ai_review( self, diff: str, context: AgentContext, security_issues: list[ReviewIssue], ) -> PRReviewResult: """Run AI-based code review.""" prompt_template = self.load_prompt("base") # Add security context if issues were found security_context = "" if security_issues: security_context = "\n\nSECURITY SCAN RESULTS (already detected):\n" for issue in security_issues[:5]: # Limit to first 5 security_context += f"- [{issue.severity}] {issue.file}:{issue.line} - {issue.description}\n" prompt = f"{prompt_template}\n{security_context}\nDIFF:\n{diff}" try: result = self.call_llm_json(prompt) issues = [] for issue_data in result.get("issues", []): issues.append( ReviewIssue( file=issue_data.get("file", "unknown"), line=issue_data.get("line"), severity=issue_data.get("severity", "MEDIUM"), category=issue_data.get("category", "General"), description=issue_data.get("description", ""), recommendation=issue_data.get("recommendation", ""), code_snippet=issue_data.get("code_snippet"), ) ) return PRReviewResult( summary=result.get("summary", "Review completed"), issues=issues, overall_severity=result.get("overall_severity", "LOW"), approval=result.get("approval", True), security_issues=security_issues, ) except Exception as e: self.logger.error(f"AI review failed: {e}") return PRReviewResult( summary=f"AI review encountered an error: {e}", issues=[], overall_severity="UNKNOWN", approval=False, security_issues=security_issues, ) def _post_inline_comments( self, owner: str, repo: str, pr_number: int, review: PRReviewResult, ) -> int: """Post inline comments for issues with line numbers.""" comments = [] all_issues = review.issues + review.security_issues for issue in all_issues: if issue.line and issue.file: comment_body = ( f"**[{issue.severity}] {issue.category}**\n\n" f"{issue.description}\n\n" f"**Recommendation:** {issue.recommendation}" ) comments.append( { "path": issue.file, "line": issue.line, "body": comment_body, } ) if not comments: return 0 try: # Use Gitea's pull request review API for inline comments self.gitea.create_pull_request_review( owner=owner, repo=repo, index=pr_number, body="AI Code Review - Inline Comments", event="COMMENT", comments=comments[:10], # Limit to 10 inline comments ) return min(len(comments), 10) except Exception as e: self.logger.warning(f"Failed to post inline comments: {e}") return 0 def _generate_summary_comment(self, review: PRReviewResult) -> str: """Generate the summary comment for the PR.""" lines = [ f"{self.AI_DISCLAIMER}", "", "## AI Code Review", "", review.summary, "", ] # Statistics all_issues = review.issues + review.security_issues high = sum(1 for i in all_issues if i.severity == "HIGH") medium = sum(1 for i in all_issues if i.severity == "MEDIUM") low = sum(1 for i in all_issues if i.severity == "LOW") lines.append("### Summary") lines.append("") lines.append(f"| Severity | Count |") lines.append(f"|----------|-------|") lines.append(f"| HIGH | {high} |") lines.append(f"| MEDIUM | {medium} |") lines.append(f"| LOW | {low} |") lines.append("") # Security issues section if review.security_issues: lines.append("### Security Issues") lines.append("") for issue in review.security_issues[:5]: lines.append( f"- **[{issue.severity}]** `{issue.file}:{issue.line}` - {issue.description}" ) lines.append("") # Other issues (limit display) other_issues = [i for i in review.issues if i not in review.security_issues] if other_issues: lines.append("### Review Findings") lines.append("") for issue in other_issues[:10]: loc = ( f"`{issue.file}:{issue.line}`" if issue.line else f"`{issue.file}`" ) lines.append(f"- **[{issue.severity}]** {loc} - {issue.description}") if len(other_issues) > 10: lines.append(f"- ...and {len(other_issues) - 10} more issues") lines.append("") # Verdict lines.append("---") lines.append(f"**Overall Severity:** `{review.overall_severity}`") if review.approval: lines.append("**AI Recommendation:** Approve") else: lines.append("**AI Recommendation:** Changes Requested") return "\n".join(lines) def _apply_review_labels( self, owner: str, repo: str, pr_number: int, review: PRReviewResult, ) -> list[str]: """Apply labels based on review result.""" try: repo_labels = self.gitea.get_repo_labels(owner, repo) label_map = {l["name"]: l["id"] for l in repo_labels} except Exception as e: self.logger.warning(f"Failed to get repo labels: {e}") return [] labels_to_add = [] # Add approval/changes required label # Use helper to support both old string and new dict format if review.approval: label_config = self._get_label_config("status", "ai_approved") else: label_config = self._get_label_config("status", "ai_changes_required") label_name = label_config.get("name", "") if label_name and label_name in label_map: labels_to_add.append(label_map[label_name]) if labels_to_add: try: self.gitea.add_issue_labels(owner, repo, pr_number, labels_to_add) return [name for name, id in label_map.items() if id in labels_to_add] except Exception as e: self.logger.warning(f"Failed to add labels: {e}") return []