"""Issue Review Agent AI agent for triaging, labeling, and responding to issues. Handles issue.opened, issue.labeled, and issue_comment events. """ import logging import re from dataclasses import dataclass from agents.base_agent import AgentContext, AgentResult, BaseAgent @dataclass class TriageResult: """Result of issue triage analysis.""" issue_type: str priority: str confidence: float summary: str suggested_labels: list[str] is_duplicate: bool duplicate_of: int | None needs_more_info: bool missing_info: list[str] components: list[str] reasoning: str class IssueAgent(BaseAgent): """Agent for handling issue events.""" # Marker specific to issue comments ISSUE_AI_MARKER = "" def can_handle(self, event_type: str, event_data: dict) -> bool: """Check if this agent handles the given event.""" # Check if agent is enabled agent_config = self.config.get("agents", {}).get("issue", {}) if not agent_config.get("enabled", True): return False # Handle issue events if event_type == "issues": action = event_data.get("action", "") allowed_events = agent_config.get("events", ["opened", "labeled"]) if action not in allowed_events: return False # Ignore our own codebase reports to prevent double-commenting issue = event_data.get("issue", {}) title = issue.get("title", "") labels = [l.get("name") for l in issue.get("labels", [])] if "AI Codebase Report" in title or "ai-codebase-report" in labels: return False return True # Handle issue comment events (for @mentions) if event_type == "issue_comment": action = event_data.get("action", "") if action == "created": comment_body = event_data.get("comment", {}).get("body", "") mention_prefix = self.config.get("interaction", {}).get( "mention_prefix", "@ai-bot" ) return mention_prefix in comment_body return False def execute(self, context: AgentContext) -> AgentResult: """Execute the issue agent.""" event_data = context.event_data action = event_data.get("action", "") if context.event_type == "issues": if action == "opened": return self._handle_issue_opened(context) elif action == "labeled": return self._handle_issue_labeled(context) if context.event_type == "issue_comment": return self._handle_issue_comment(context) return AgentResult( success=False, message=f"Unknown action: {action}", ) def _handle_issue_opened(self, context: AgentContext) -> AgentResult: """Handle a newly opened issue.""" issue = context.event_data.get("issue", {}) issue_index = issue.get("number") title = issue.get("title", "") body = issue.get("body", "") author = issue.get("user", {}).get("login", "unknown") existing_labels = [l.get("name", "") for l in issue.get("labels", [])] self.logger.info(f"Triaging issue #{issue_index}: {title}") # Step 1: Triage the issue triage = self._triage_issue(title, body, author, existing_labels) actions_taken = [] # Step 2: Apply labels if auto-label is enabled agent_config = self.config.get("agents", {}).get("issue", {}) if agent_config.get("auto_label", True): labels_applied = self._apply_labels( context.owner, context.repo, issue_index, triage ) if labels_applied: actions_taken.append(f"Applied labels: {labels_applied}") # Step 3: Post triage comment comment = self._generate_triage_comment(triage, issue) self.upsert_comment( context.owner, context.repo, issue_index, comment, marker=self.ISSUE_AI_MARKER, ) actions_taken.append("Posted triage comment") return AgentResult( success=True, message=f"Triaged issue #{issue_index} as {triage.issue_type} ({triage.priority} priority)", data={ "triage": { "type": triage.issue_type, "priority": triage.priority, "confidence": triage.confidence, } }, actions_taken=actions_taken, ) def _handle_issue_labeled(self, context: AgentContext) -> AgentResult: """Handle label addition to an issue.""" # Could be used for specific label-triggered actions issue = context.event_data.get("issue", {}) label = context.event_data.get("label", {}) return AgentResult( success=True, message=f"Noted label '{label.get('name')}' added to issue #{issue.get('number')}", ) def _handle_issue_comment(self, context: AgentContext) -> AgentResult: """Handle @mention in issue comment.""" issue = context.event_data.get("issue", {}) comment = context.event_data.get("comment", {}) issue_index = issue.get("number") comment_body = comment.get("body", "") comment_author = comment.get("user", {}).get("login", "user") # Parse command from mention command = self._parse_command(comment_body) if command: response = self._handle_command(context, issue, command) # Add user mention at the top response_with_mention = f"@{comment_author}\n\n{response}" self.gitea.create_issue_comment( context.owner, context.repo, issue_index, response_with_mention ) return AgentResult( success=True, message=f"Responded to command: {command}", actions_taken=["Posted command response"], ) return AgentResult( success=True, message="No actionable command found in mention", ) def _triage_issue( self, title: str, body: str, author: str, existing_labels: list[str], ) -> TriageResult: """Use LLM to triage the issue.""" prompt_template = self.load_prompt("issue_triage") prompt = prompt_template.format( title=title, body=body or "(no description provided)", author=author, existing_labels=", ".join(existing_labels) if existing_labels else "none", ) try: result = self.call_llm_json(prompt) return TriageResult( issue_type=result.get("type", "question"), priority=result.get("priority", "medium"), confidence=result.get("confidence", 0.5), summary=result.get("summary", title), suggested_labels=result.get("suggested_labels", []), is_duplicate=result.get("is_duplicate", False), duplicate_of=result.get("duplicate_of"), needs_more_info=result.get("needs_more_info", False), missing_info=result.get("missing_info", []), components=result.get("components", []), reasoning=result.get("reasoning", ""), ) except Exception as e: self.logger.warning(f"LLM triage failed: {e}") # Return default triage on failure return TriageResult( issue_type="question", priority="medium", confidence=0.3, summary=title, suggested_labels=[], is_duplicate=False, duplicate_of=None, needs_more_info=True, missing_info=["Unable to parse issue automatically"], components=[], reasoning="Automatic triage failed, needs human review", ) def _get_label_name(self, label_config: str | dict) -> str: """Get label name from config (supports both old string and new dict format). Args: label_config: Either a string (old format) or dict with 'name' key (new format) Returns: Label name as string """ if isinstance(label_config, str): return label_config elif isinstance(label_config, dict): return label_config.get("name", "") return "" def _get_label_config(self, category: str, key: str) -> dict: """Get full label configuration from config. Args: category: Label category (type, priority, status) key: Label key within category (bug, high, etc.) Returns: Dict with name, color, description, aliases """ labels_config = self.config.get("labels", {}) category_config = labels_config.get(category, {}) label_config = category_config.get(key, {}) # Handle old string format if isinstance(label_config, str): return { "name": label_config, "color": "1d76db", # Default blue "description": "", "aliases": [], } # Handle new dict format return { "name": label_config.get("name", ""), "color": label_config.get("color", "1d76db"), "description": label_config.get("description", ""), "aliases": label_config.get("aliases", []), } def _apply_labels( self, owner: str, repo: str, issue_index: int, triage: TriageResult, ) -> list[str]: """Apply labels based on triage result.""" # Get all repo labels try: repo_labels = self.gitea.get_repo_labels(owner, repo) label_map = {l["name"]: l["id"] for l in repo_labels} except Exception as e: self.logger.warning(f"Failed to get repo labels: {e}") return [] labels_to_add = [] # Map priority using new helper priority_config = self._get_label_config("priority", triage.priority) priority_label_name = priority_config["name"] if priority_label_name and priority_label_name in label_map: labels_to_add.append(label_map[priority_label_name]) # Map type using new helper type_config = self._get_label_config("type", triage.issue_type) type_label_name = type_config["name"] if type_label_name and type_label_name in label_map: labels_to_add.append(label_map[type_label_name]) # Add AI reviewed label using new helper reviewed_config = self._get_label_config("status", "ai_reviewed") reviewed_label_name = reviewed_config["name"] if reviewed_label_name and reviewed_label_name in label_map: labels_to_add.append(label_map[reviewed_label_name]) if labels_to_add: try: self.gitea.add_issue_labels(owner, repo, issue_index, labels_to_add) return [name for name, id in label_map.items() if id in labels_to_add] except Exception as e: self.logger.warning(f"Failed to add labels: {e}") return [] def _generate_triage_comment(self, triage: TriageResult, issue: dict) -> str: """Generate a triage summary comment.""" lines = [ f"{self.AI_DISCLAIMER}", "", "## AI Issue Triage", "", f"| Field | Value |", f"|-------|--------|", f"| **Type** | {triage.issue_type.capitalize()} |", f"| **Priority** | {triage.priority.capitalize()} |", f"| **Confidence** | {triage.confidence:.0%} |", "", ] if triage.summary != issue.get("title"): lines.append(f"**Summary:** {triage.summary}") lines.append("") if triage.components: lines.append(f"**Components:** {', '.join(triage.components)}") lines.append("") if triage.needs_more_info and triage.missing_info: lines.append("### Additional Information Needed") lines.append("") for info in triage.missing_info: lines.append(f"- {info}") lines.append("") if triage.is_duplicate and triage.duplicate_of: lines.append(f"### Possible Duplicate") lines.append(f"This issue may be a duplicate of #{triage.duplicate_of}") lines.append("") lines.append("---") lines.append(f"*{triage.reasoning}*") return "\n".join(lines) def _parse_command(self, body: str) -> str | None: """Parse a command from a comment body.""" mention_prefix = self.config.get("interaction", {}).get( "mention_prefix", "@ai-bot" ) commands = self.config.get("interaction", {}).get( "commands", ["explain", "suggest", "security", "summarize", "triage"] ) # Also check for setup-labels command (not in config since it's a setup command) if f"{mention_prefix} setup-labels" in body.lower(): return "setup-labels" for command in commands: if f"{mention_prefix} {command}" in body.lower(): return command return None def _handle_command(self, context: AgentContext, issue: dict, command: str) -> str: """Handle a command from an @mention.""" title = issue.get("title", "") body = issue.get("body", "") if command == "help": return self._command_help() elif command == "summarize": return self._command_summarize(title, body) elif command == "explain": return self._command_explain(title, body) elif command == "suggest": return self._command_suggest(title, body) elif command == "triage": return self._command_triage(context, issue) elif command == "setup-labels": return self._command_setup_labels(context, issue) return f"{self.AI_DISCLAIMER}\n\nSorry, I don't understand the command `{command}`." def _command_summarize(self, title: str, body: str) -> str: """Generate a summary of the issue.""" prompt = f"""Summarize the following issue in 2-3 concise sentences: Title: {title} Body: {body} Provide only the summary, no additional formatting.""" try: response = self.call_llm(prompt) return f"{self.AI_DISCLAIMER}\n\n**Summary:**\n{response.content}" except Exception as e: return f"{self.AI_DISCLAIMER}\n\nSorry, I was unable to generate a summary. Error: {e}" def _command_explain(self, title: str, body: str) -> str: """Explain the issue in more detail.""" prompt = f"""Analyze this issue and provide a clear explanation of what the user is asking for or reporting: Title: {title} Body: {body} Provide: 1. What the issue is about 2. What the user expects 3. Any technical context that might be relevant Be concise and helpful.""" try: response = self.call_llm(prompt) return f"{self.AI_DISCLAIMER}\n\n**Explanation:**\n{response.content}" except Exception as e: return f"{self.AI_DISCLAIMER}\n\nSorry, I was unable to explain this issue. Error: {e}" def _command_suggest(self, title: str, body: str) -> str: """Suggest solutions for the issue.""" prompt = f"""Based on this issue, suggest potential solutions or next steps: Title: {title} Body: {body} Provide 2-3 actionable suggestions. If this is a bug, suggest debugging steps. If this is a feature request, suggest implementation approaches. Be practical and concise.""" try: response = self.call_llm(prompt) return f"{self.AI_DISCLAIMER}\n\n**Suggestions:**\n{response.content}" except Exception as e: return f"{self.AI_DISCLAIMER}\n\nSorry, I was unable to generate suggestions. Error: {e}" def _command_help(self) -> str: """Generate help message with all available commands.""" mention_prefix = self.config.get("interaction", {}).get( "mention_prefix", "@codebot" ) help_text = f"""{self.AI_DISCLAIMER} ## Available {mention_prefix} Commands ### Issue Triage & Analysis - `{mention_prefix} triage` - Full issue triage with auto-labeling and priority assignment - `{mention_prefix} summarize` - Generate 2-3 sentence summary of the issue - `{mention_prefix} explain` - Detailed explanation of what the issue is about - `{mention_prefix} suggest` - Solution suggestions or next steps - `{mention_prefix} security` - Security-focused analysis of the issue ### Interactive Chat - `{mention_prefix} [question]` - Ask questions about the codebase (uses search & file reading tools) - Example: `{mention_prefix} how does authentication work?` - Example: `{mention_prefix} find all API endpoints` ### Setup & Utility - `{mention_prefix} help` - Show this help message - `{mention_prefix} setup-labels` - Auto-create/map repository labels for auto-labeling ### Pull Request Analysis PR reviews run automatically when you open or update a pull request. The bot provides: - Inline code review comments - Security vulnerability scanning - Approval or change-request recommendations --- ### Quick Examples **Triage an issue:** ``` {mention_prefix} triage ``` **Get help understanding:** ``` {mention_prefix} explain ``` **Ask about the codebase:** ``` {mention_prefix} how does the authentication system work? ``` **Setup repository labels:** ``` {mention_prefix} setup-labels ``` --- *For full documentation, see the [README](https://github.com/YourOrg/OpenRabbit/blob/main/README.md)* """ return help_text def _command_triage(self, context: AgentContext, issue: dict) -> str: """Perform full triage analysis on the issue.""" title = issue.get("title", "") body = issue.get("body", "") author = issue.get("user", {}).get("login", "unknown") existing_labels = [l.get("name", "") for l in issue.get("labels", [])] issue_index = issue.get("number") try: # Perform triage analysis triage = self._triage_issue(title, body, author, existing_labels) # Apply labels if enabled agent_config = self.config.get("agents", {}).get("issue", {}) labels_applied = [] if agent_config.get("auto_label", True): labels_applied = self._apply_labels( context.owner, context.repo, issue_index, triage ) # Generate response response = self._generate_triage_comment(triage, issue) if labels_applied: response += f"\n\n**Labels Applied:** {', '.join(labels_applied)}" return response except Exception as e: return f"{self.AI_DISCLAIMER}\n\nSorry, I was unable to triage this issue. Error: {e}" def _command_setup_labels(self, context: AgentContext, issue: dict) -> str: """Setup repository labels for auto-labeling.""" owner = context.owner repo = context.repo try: # Get existing labels existing_labels = self.gitea.get_repo_labels(owner, repo) existing_names = { label["name"].lower(): label["name"] for label in existing_labels } # Detect schema schema = self._detect_label_schema(existing_labels) # Determine mode if schema and len(existing_labels) >= 5: # Repository has existing labels, use mapping mode return self._setup_labels_map_mode( owner, repo, existing_labels, schema, existing_names ) else: # Fresh repository or few labels, use create mode return self._setup_labels_create_mode(owner, repo, existing_names) except Exception as e: self.logger.error(f"Label setup failed: {e}") return f"{self.AI_DISCLAIMER}\n\n**Label Setup Failed**\n\nError: {e}\n\nPlease ensure the bot has write access to this repository." def _detect_label_schema(self, labels: list[dict]) -> dict | None: """Detect the naming pattern used in existing labels. Returns: { "pattern": "prefix_slash" | "prefix_dash" | "colon", "categories": { "type": ["Kind/Bug", "Kind/Feature", ...], "priority": ["Priority - High", ...], } } """ patterns_config = self.config.get("label_patterns", {}) patterns = { "prefix_slash": re.compile( patterns_config.get("prefix_slash", r"^(Kind|Type|Category)/(.+)$") ), "prefix_dash": re.compile( patterns_config.get( "prefix_dash", r"^(Priority|Status|Reviewed) - (.+)$" ) ), "colon": re.compile( patterns_config.get("colon", r"^(type|priority|status): (.+)$") ), } categorized = {} detected_pattern = None for label in labels: name = label["name"] for pattern_name, regex in patterns.items(): match = regex.match(name) if match: category = match.group(1).lower() # Normalize category names if category == "kind": category = "type" elif category == "reviewed": category = "status" if category not in categorized: categorized[category] = [] categorized[category].append(name) detected_pattern = pattern_name break if not categorized: return None return {"pattern": detected_pattern, "categories": categorized} def _build_label_mapping(self, existing_labels: list[dict], schema: dict) -> dict: """Build mapping from OpenRabbit schema to existing labels. Returns: { "type": { "bug": "Kind/Bug", "feature": "Kind/Feature", }, "priority": { "high": "Priority - High", } } """ mapping = {} label_names_lower = { label["name"].lower(): label["name"] for label in existing_labels } # Get all configured labels with their aliases labels_config = self.config.get("labels", {}) for category in ["type", "priority", "status"]: category_config = labels_config.get(category, {}) mapping[category] = {} for key, label_def in category_config.items(): config = self._get_label_config(category, key) aliases = config.get("aliases", []) # Try to find a match using aliases for alias in aliases: if alias.lower() in label_names_lower: mapping[category][key] = label_names_lower[alias.lower()] break return mapping def _setup_labels_map_mode( self, owner: str, repo: str, existing_labels: list[dict], schema: dict, existing_names: dict, ) -> str: """Map existing labels to OpenRabbit schema.""" # Build mapping mapping = self._build_label_mapping(existing_labels, schema) # Get required labels required_labels = self._get_required_labels() # Find missing labels missing = [] for category, items in required_labels.items(): for key in items: if key not in mapping.get(category, {}): missing.append((category, key)) # Format report lines = [f"{self.AI_DISCLAIMER}\n"] lines.append("## Label Schema Detected\n") lines.append( f"Found {len(existing_labels)} existing labels with pattern: `{schema['pattern']}`\n" ) lines.append("**Detected Categories:**") for category, labels in schema["categories"].items(): lines.append(f"- **{category.title()}** ({len(labels)} labels)") lines.append("") lines.append("**Proposed Mapping:**\n") lines.append("| OpenRabbit Expected | Your Existing Label | Status |") lines.append("|---------------------|---------------------|--------|") for category, items in required_labels.items(): for key in items: openrabbit_config = self._get_label_config(category, key) openrabbit_name = openrabbit_config["name"] if key in mapping.get(category, {}): existing_name = mapping[category][key] lines.append( f"| `{openrabbit_name}` | `{existing_name}` | ✅ Map |" ) else: lines.append(f"| `{openrabbit_name}` | *(missing)* | ⚠️ Create |") lines.append("") # Create missing labels if missing: lines.append(f"**Creating Missing Labels ({len(missing)}):**\n") created_count = 0 for category, key in missing: config = self._get_label_config(category, key) suggested_name = self._suggest_label_name( category, key, schema["pattern"] ) # Check if label already exists (case-insensitive) if suggested_name.lower() not in existing_names: try: self.gitea.create_label( owner, repo, suggested_name, config["color"], config["description"], ) lines.append( f"✅ Created `{suggested_name}` (#{config['color']})" ) created_count += 1 except Exception as e: lines.append(f"❌ Failed to create `{suggested_name}`: {e}") else: lines.append(f"⚠️ `{suggested_name}` already exists") lines.append("") if created_count > 0: lines.append(f"**✅ Created {created_count} new labels!**") else: lines.append("**✅ All Required Labels Present!**") lines.append("\n**Setup Complete!**") lines.append("Auto-labeling will use your existing label schema.") return "\n".join(lines) def _setup_labels_create_mode( self, owner: str, repo: str, existing_names: dict ) -> str: """Create OpenRabbit default labels.""" lines = [f"{self.AI_DISCLAIMER}\n"] lines.append("## Creating OpenRabbit Labels\n") # Get all required labels required_labels = self._get_required_labels() created = [] skipped = [] failed = [] for category, items in required_labels.items(): for key in items: config = self._get_label_config(category, key) label_name = config["name"] # Check if already exists (case-insensitive) if label_name.lower() in existing_names: skipped.append(label_name) continue try: self.gitea.create_label( owner, repo, label_name, config["color"], config["description"] ) created.append((label_name, config["color"])) except Exception as e: failed.append((label_name, str(e))) if created: lines.append(f"**✅ Created {len(created)} Labels:**\n") for name, color in created: lines.append(f"- `{name}` (#{color})") lines.append("") if skipped: lines.append(f"**⚠️ Skipped {len(skipped)} Existing Labels:**\n") for name in skipped: lines.append(f"- `{name}`") lines.append("") if failed: lines.append(f"**❌ Failed to Create {len(failed)} Labels:**\n") for name, error in failed: lines.append(f"- `{name}`: {error}") lines.append("") lines.append("**✅ Setup Complete!**") lines.append("Auto-labeling is now configured.") return "\n".join(lines) def _get_required_labels(self) -> dict: """Get all required label categories and keys. Returns: { "type": ["bug", "feature", "question", "docs"], "priority": ["high", "medium", "low"], "status": ["ai_approved", "ai_changes_required", "ai_reviewed"] } """ labels_config = self.config.get("labels", {}) required = {} for category in ["type", "priority", "status"]: category_config = labels_config.get(category, {}) required[category] = list(category_config.keys()) return required def _suggest_label_name(self, category: str, key: str, pattern: str) -> str: """Suggest a label name based on detected pattern.""" # Get the configured name first config = self._get_label_config(category, key) base_name = config["name"] if pattern == "prefix_slash": prefix = "Kind" if category == "type" else category.title() value = key.replace("_", " ").title() return f"{prefix}/{value}" elif pattern == "prefix_dash": prefix = "Kind" if category == "type" else category.title() value = key.replace("_", " ").title() return f"{prefix} - {value}" else: # colon or unknown return base_name