Files
openrabbit/tools/ai-review/agents/issue_agent.py
latte b428d0a37e
All checks were successful
Enterprise AI Code Review / ai-review (pull_request) Successful in 27s
feat: Add @codebot review-again command for manual PR re-review
- Add review-again command to trigger PR review without new commits
- Implement review comparison logic to show resolved/new/changed issues
- Update workflow to handle PR comments via dispatcher
- Add comprehensive help documentation in README and CLAUDE.md
- Show diff from previous review with resolved/new issues count
- Update PR labels based on new severity assessment
- Support re-evaluation after config changes or false positive clarification

Key features:
-  Shows diff from previous review (resolved/new/changed issues)
- 🏷️ Updates labels based on new severity
-  No need for empty commits to trigger review
- 🔧 Respects latest .ai-review.yml configuration

Closes feature request for manual PR re-review capability
2025-12-28 19:12:34 +00:00

857 lines
30 KiB
Python

"""Issue Review Agent
AI agent for triaging, labeling, and responding to issues.
Handles issue.opened, issue.labeled, and issue_comment events.
"""
import logging
import re
from dataclasses import dataclass
from agents.base_agent import AgentContext, AgentResult, BaseAgent
@dataclass
class TriageResult:
"""Result of issue triage analysis."""
issue_type: str
priority: str
confidence: float
summary: str
suggested_labels: list[str]
is_duplicate: bool
duplicate_of: int | None
needs_more_info: bool
missing_info: list[str]
components: list[str]
reasoning: str
class IssueAgent(BaseAgent):
"""Agent for handling issue events."""
# Marker specific to issue comments
ISSUE_AI_MARKER = "<!-- AI_ISSUE_TRIAGE -->"
def can_handle(self, event_type: str, event_data: dict) -> bool:
"""Check if this agent handles the given event."""
# Check if agent is enabled
agent_config = self.config.get("agents", {}).get("issue", {})
if not agent_config.get("enabled", True):
return False
# Handle issue events
if event_type == "issues":
action = event_data.get("action", "")
allowed_events = agent_config.get("events", ["opened", "labeled"])
if action not in allowed_events:
return False
# Ignore our own codebase reports to prevent double-commenting
issue = event_data.get("issue", {})
title = issue.get("title", "")
labels = [l.get("name") for l in issue.get("labels", [])]
if "AI Codebase Report" in title or "ai-codebase-report" in labels:
return False
return True
# Handle issue comment events (for @mentions)
if event_type == "issue_comment":
action = event_data.get("action", "")
if action == "created":
comment_body = event_data.get("comment", {}).get("body", "")
mention_prefix = self.config.get("interaction", {}).get(
"mention_prefix", "@ai-bot"
)
return mention_prefix in comment_body
return False
def execute(self, context: AgentContext) -> AgentResult:
"""Execute the issue agent."""
event_data = context.event_data
action = event_data.get("action", "")
if context.event_type == "issues":
if action == "opened":
return self._handle_issue_opened(context)
elif action == "labeled":
return self._handle_issue_labeled(context)
if context.event_type == "issue_comment":
return self._handle_issue_comment(context)
return AgentResult(
success=False,
message=f"Unknown action: {action}",
)
def _handle_issue_opened(self, context: AgentContext) -> AgentResult:
"""Handle a newly opened issue."""
issue = context.event_data.get("issue", {})
issue_index = issue.get("number")
title = issue.get("title", "")
body = issue.get("body", "")
author = issue.get("user", {}).get("login", "unknown")
existing_labels = [l.get("name", "") for l in issue.get("labels", [])]
self.logger.info(f"Triaging issue #{issue_index}: {title}")
# Step 1: Triage the issue
triage = self._triage_issue(title, body, author, existing_labels)
actions_taken = []
# Step 2: Apply labels if auto-label is enabled
agent_config = self.config.get("agents", {}).get("issue", {})
if agent_config.get("auto_label", True):
labels_applied = self._apply_labels(
context.owner, context.repo, issue_index, triage
)
if labels_applied:
actions_taken.append(f"Applied labels: {labels_applied}")
# Step 3: Post triage comment
comment = self._generate_triage_comment(triage, issue)
self.upsert_comment(
context.owner,
context.repo,
issue_index,
comment,
marker=self.ISSUE_AI_MARKER,
)
actions_taken.append("Posted triage comment")
return AgentResult(
success=True,
message=f"Triaged issue #{issue_index} as {triage.issue_type} ({triage.priority} priority)",
data={
"triage": {
"type": triage.issue_type,
"priority": triage.priority,
"confidence": triage.confidence,
}
},
actions_taken=actions_taken,
)
def _handle_issue_labeled(self, context: AgentContext) -> AgentResult:
"""Handle label addition to an issue."""
# Could be used for specific label-triggered actions
issue = context.event_data.get("issue", {})
label = context.event_data.get("label", {})
return AgentResult(
success=True,
message=f"Noted label '{label.get('name')}' added to issue #{issue.get('number')}",
)
def _handle_issue_comment(self, context: AgentContext) -> AgentResult:
"""Handle @mention in issue comment."""
issue = context.event_data.get("issue", {})
comment = context.event_data.get("comment", {})
issue_index = issue.get("number")
comment_body = comment.get("body", "")
comment_author = comment.get("user", {}).get("login", "user")
# Parse command from mention
command = self._parse_command(comment_body)
if command:
response = self._handle_command(context, issue, command)
# Add user mention at the top
response_with_mention = f"@{comment_author}\n\n{response}"
self.gitea.create_issue_comment(
context.owner, context.repo, issue_index, response_with_mention
)
return AgentResult(
success=True,
message=f"Responded to command: {command}",
actions_taken=["Posted command response"],
)
return AgentResult(
success=True,
message="No actionable command found in mention",
)
def _triage_issue(
self,
title: str,
body: str,
author: str,
existing_labels: list[str],
) -> TriageResult:
"""Use LLM to triage the issue."""
prompt_template = self.load_prompt("issue_triage")
prompt = prompt_template.format(
title=title,
body=body or "(no description provided)",
author=author,
existing_labels=", ".join(existing_labels) if existing_labels else "none",
)
try:
result = self.call_llm_json(prompt)
return TriageResult(
issue_type=result.get("type", "question"),
priority=result.get("priority", "medium"),
confidence=result.get("confidence", 0.5),
summary=result.get("summary", title),
suggested_labels=result.get("suggested_labels", []),
is_duplicate=result.get("is_duplicate", False),
duplicate_of=result.get("duplicate_of"),
needs_more_info=result.get("needs_more_info", False),
missing_info=result.get("missing_info", []),
components=result.get("components", []),
reasoning=result.get("reasoning", ""),
)
except Exception as e:
self.logger.warning(f"LLM triage failed: {e}")
# Return default triage on failure
return TriageResult(
issue_type="question",
priority="medium",
confidence=0.3,
summary=title,
suggested_labels=[],
is_duplicate=False,
duplicate_of=None,
needs_more_info=True,
missing_info=["Unable to parse issue automatically"],
components=[],
reasoning="Automatic triage failed, needs human review",
)
def _get_label_name(self, label_config: str | dict) -> str:
"""Get label name from config (supports both old string and new dict format).
Args:
label_config: Either a string (old format) or dict with 'name' key (new format)
Returns:
Label name as string
"""
if isinstance(label_config, str):
return label_config
elif isinstance(label_config, dict):
return label_config.get("name", "")
return ""
def _get_label_config(self, category: str, key: str) -> dict:
"""Get full label configuration from config.
Args:
category: Label category (type, priority, status)
key: Label key within category (bug, high, etc.)
Returns:
Dict with name, color, description, aliases
"""
labels_config = self.config.get("labels", {})
category_config = labels_config.get(category, {})
label_config = category_config.get(key, {})
# Handle old string format
if isinstance(label_config, str):
return {
"name": label_config,
"color": "1d76db", # Default blue
"description": "",
"aliases": [],
}
# Handle new dict format
return {
"name": label_config.get("name", ""),
"color": label_config.get("color", "1d76db"),
"description": label_config.get("description", ""),
"aliases": label_config.get("aliases", []),
}
def _apply_labels(
self,
owner: str,
repo: str,
issue_index: int,
triage: TriageResult,
) -> list[str]:
"""Apply labels based on triage result."""
# Get all repo labels
try:
repo_labels = self.gitea.get_repo_labels(owner, repo)
label_map = {l["name"]: l["id"] for l in repo_labels}
except Exception as e:
self.logger.warning(f"Failed to get repo labels: {e}")
return []
labels_to_add = []
# Map priority using new helper
priority_config = self._get_label_config("priority", triage.priority)
priority_label_name = priority_config["name"]
if priority_label_name and priority_label_name in label_map:
labels_to_add.append(label_map[priority_label_name])
# Map type using new helper
type_config = self._get_label_config("type", triage.issue_type)
type_label_name = type_config["name"]
if type_label_name and type_label_name in label_map:
labels_to_add.append(label_map[type_label_name])
# Add AI reviewed label using new helper
reviewed_config = self._get_label_config("status", "ai_reviewed")
reviewed_label_name = reviewed_config["name"]
if reviewed_label_name and reviewed_label_name in label_map:
labels_to_add.append(label_map[reviewed_label_name])
if labels_to_add:
try:
self.gitea.add_issue_labels(owner, repo, issue_index, labels_to_add)
return [name for name, id in label_map.items() if id in labels_to_add]
except Exception as e:
self.logger.warning(f"Failed to add labels: {e}")
return []
def _generate_triage_comment(self, triage: TriageResult, issue: dict) -> str:
"""Generate a triage summary comment."""
lines = [
f"{self.AI_DISCLAIMER}",
"",
"## AI Issue Triage",
"",
f"| Field | Value |",
f"|-------|--------|",
f"| **Type** | {triage.issue_type.capitalize()} |",
f"| **Priority** | {triage.priority.capitalize()} |",
f"| **Confidence** | {triage.confidence:.0%} |",
"",
]
if triage.summary != issue.get("title"):
lines.append(f"**Summary:** {triage.summary}")
lines.append("")
if triage.components:
lines.append(f"**Components:** {', '.join(triage.components)}")
lines.append("")
if triage.needs_more_info and triage.missing_info:
lines.append("### Additional Information Needed")
lines.append("")
for info in triage.missing_info:
lines.append(f"- {info}")
lines.append("")
if triage.is_duplicate and triage.duplicate_of:
lines.append(f"### Possible Duplicate")
lines.append(f"This issue may be a duplicate of #{triage.duplicate_of}")
lines.append("")
lines.append("---")
lines.append(f"*{triage.reasoning}*")
return "\n".join(lines)
def _parse_command(self, body: str) -> str | None:
"""Parse a command from a comment body."""
mention_prefix = self.config.get("interaction", {}).get(
"mention_prefix", "@ai-bot"
)
commands = self.config.get("interaction", {}).get(
"commands", ["explain", "suggest", "security", "summarize", "triage"]
)
# Also check for setup-labels command (not in config since it's a setup command)
if f"{mention_prefix} setup-labels" in body.lower():
return "setup-labels"
for command in commands:
if f"{mention_prefix} {command}" in body.lower():
return command
return None
def _handle_command(self, context: AgentContext, issue: dict, command: str) -> str:
"""Handle a command from an @mention."""
title = issue.get("title", "")
body = issue.get("body", "")
if command == "help":
return self._command_help()
elif command == "summarize":
return self._command_summarize(title, body)
elif command == "explain":
return self._command_explain(title, body)
elif command == "suggest":
return self._command_suggest(title, body)
elif command == "triage":
return self._command_triage(context, issue)
elif command == "setup-labels":
return self._command_setup_labels(context, issue)
return f"{self.AI_DISCLAIMER}\n\nSorry, I don't understand the command `{command}`."
def _command_summarize(self, title: str, body: str) -> str:
"""Generate a summary of the issue."""
prompt = f"""Summarize the following issue in 2-3 concise sentences:
Title: {title}
Body: {body}
Provide only the summary, no additional formatting."""
try:
response = self.call_llm(prompt)
return f"{self.AI_DISCLAIMER}\n\n**Summary:**\n{response.content}"
except Exception as e:
return f"{self.AI_DISCLAIMER}\n\nSorry, I was unable to generate a summary. Error: {e}"
def _command_explain(self, title: str, body: str) -> str:
"""Explain the issue in more detail."""
prompt = f"""Analyze this issue and provide a clear explanation of what the user is asking for or reporting:
Title: {title}
Body: {body}
Provide:
1. What the issue is about
2. What the user expects
3. Any technical context that might be relevant
Be concise and helpful."""
try:
response = self.call_llm(prompt)
return f"{self.AI_DISCLAIMER}\n\n**Explanation:**\n{response.content}"
except Exception as e:
return f"{self.AI_DISCLAIMER}\n\nSorry, I was unable to explain this issue. Error: {e}"
def _command_suggest(self, title: str, body: str) -> str:
"""Suggest solutions for the issue."""
prompt = f"""Based on this issue, suggest potential solutions or next steps:
Title: {title}
Body: {body}
Provide 2-3 actionable suggestions. If this is a bug, suggest debugging steps. If this is a feature request, suggest implementation approaches.
Be practical and concise."""
try:
response = self.call_llm(prompt)
return f"{self.AI_DISCLAIMER}\n\n**Suggestions:**\n{response.content}"
except Exception as e:
return f"{self.AI_DISCLAIMER}\n\nSorry, I was unable to generate suggestions. Error: {e}"
def _command_help(self) -> str:
"""Generate help message with all available commands."""
mention_prefix = self.config.get("interaction", {}).get(
"mention_prefix", "@codebot"
)
help_text = f"""{self.AI_DISCLAIMER}
## Available {mention_prefix} Commands
### Issue Triage & Analysis
- `{mention_prefix} triage` - Full issue triage with auto-labeling and priority assignment
- `{mention_prefix} summarize` - Generate 2-3 sentence summary of the issue
- `{mention_prefix} explain` - Detailed explanation of what the issue is about
- `{mention_prefix} suggest` - Solution suggestions or next steps
- `{mention_prefix} security` - Security-focused analysis of the issue
### Interactive Chat
- `{mention_prefix} [question]` - Ask questions about the codebase (uses search & file reading tools)
- Example: `{mention_prefix} how does authentication work?`
- Example: `{mention_prefix} find all API endpoints`
### Setup & Utility
- `{mention_prefix} help` - Show this help message
- `{mention_prefix} setup-labels` - Auto-create/map repository labels for auto-labeling
### Pull Request Analysis
PR reviews run automatically when you open or update a pull request. The bot provides:
- Inline code review comments
- Security vulnerability scanning
- Approval or change-request recommendations
**Manual re-review:**
- `{mention_prefix} review-again` - Re-run AI review on current PR state (in PR comments)
- Shows diff from previous review (resolved/new issues)
- Updates labels and recommendations
- Useful after addressing feedback or updating config
---
### Quick Examples
**Triage an issue:**
```
{mention_prefix} triage
```
**Get help understanding:**
```
{mention_prefix} explain
```
**Ask about the codebase:**
```
{mention_prefix} how does the authentication system work?
```
**Setup repository labels:**
```
{mention_prefix} setup-labels
```
---
*For full documentation, see the [README](https://github.com/YourOrg/OpenRabbit/blob/main/README.md)*
"""
return help_text
def _command_triage(self, context: AgentContext, issue: dict) -> str:
"""Perform full triage analysis on the issue."""
title = issue.get("title", "")
body = issue.get("body", "")
author = issue.get("user", {}).get("login", "unknown")
existing_labels = [l.get("name", "") for l in issue.get("labels", [])]
issue_index = issue.get("number")
try:
# Perform triage analysis
triage = self._triage_issue(title, body, author, existing_labels)
# Apply labels if enabled
agent_config = self.config.get("agents", {}).get("issue", {})
labels_applied = []
if agent_config.get("auto_label", True):
labels_applied = self._apply_labels(
context.owner, context.repo, issue_index, triage
)
# Generate response
response = self._generate_triage_comment(triage, issue)
if labels_applied:
response += f"\n\n**Labels Applied:** {', '.join(labels_applied)}"
return response
except Exception as e:
return f"{self.AI_DISCLAIMER}\n\nSorry, I was unable to triage this issue. Error: {e}"
def _command_setup_labels(self, context: AgentContext, issue: dict) -> str:
"""Setup repository labels for auto-labeling."""
owner = context.owner
repo = context.repo
try:
# Get existing labels
existing_labels = self.gitea.get_repo_labels(owner, repo)
existing_names = {
label["name"].lower(): label["name"] for label in existing_labels
}
# Detect schema
schema = self._detect_label_schema(existing_labels)
# Determine mode
if schema and len(existing_labels) >= 5:
# Repository has existing labels, use mapping mode
return self._setup_labels_map_mode(
owner, repo, existing_labels, schema, existing_names
)
else:
# Fresh repository or few labels, use create mode
return self._setup_labels_create_mode(owner, repo, existing_names)
except Exception as e:
self.logger.error(f"Label setup failed: {e}")
return f"{self.AI_DISCLAIMER}\n\n**Label Setup Failed**\n\nError: {e}\n\nPlease ensure the bot has write access to this repository."
def _detect_label_schema(self, labels: list[dict]) -> dict | None:
"""Detect the naming pattern used in existing labels.
Returns:
{
"pattern": "prefix_slash" | "prefix_dash" | "colon",
"categories": {
"type": ["Kind/Bug", "Kind/Feature", ...],
"priority": ["Priority - High", ...],
}
}
"""
patterns_config = self.config.get("label_patterns", {})
patterns = {
"prefix_slash": re.compile(
patterns_config.get("prefix_slash", r"^(Kind|Type|Category)/(.+)$")
),
"prefix_dash": re.compile(
patterns_config.get(
"prefix_dash", r"^(Priority|Status|Reviewed) - (.+)$"
)
),
"colon": re.compile(
patterns_config.get("colon", r"^(type|priority|status): (.+)$")
),
}
categorized = {}
detected_pattern = None
for label in labels:
name = label["name"]
for pattern_name, regex in patterns.items():
match = regex.match(name)
if match:
category = match.group(1).lower()
# Normalize category names
if category == "kind":
category = "type"
elif category == "reviewed":
category = "status"
if category not in categorized:
categorized[category] = []
categorized[category].append(name)
detected_pattern = pattern_name
break
if not categorized:
return None
return {"pattern": detected_pattern, "categories": categorized}
def _build_label_mapping(self, existing_labels: list[dict], schema: dict) -> dict:
"""Build mapping from OpenRabbit schema to existing labels.
Returns:
{
"type": {
"bug": "Kind/Bug",
"feature": "Kind/Feature",
},
"priority": {
"high": "Priority - High",
}
}
"""
mapping = {}
label_names_lower = {
label["name"].lower(): label["name"] for label in existing_labels
}
# Get all configured labels with their aliases
labels_config = self.config.get("labels", {})
for category in ["type", "priority", "status"]:
category_config = labels_config.get(category, {})
mapping[category] = {}
for key, label_def in category_config.items():
config = self._get_label_config(category, key)
aliases = config.get("aliases", [])
# Try to find a match using aliases
for alias in aliases:
if alias.lower() in label_names_lower:
mapping[category][key] = label_names_lower[alias.lower()]
break
return mapping
def _setup_labels_map_mode(
self,
owner: str,
repo: str,
existing_labels: list[dict],
schema: dict,
existing_names: dict,
) -> str:
"""Map existing labels to OpenRabbit schema."""
# Build mapping
mapping = self._build_label_mapping(existing_labels, schema)
# Get required labels
required_labels = self._get_required_labels()
# Find missing labels
missing = []
for category, items in required_labels.items():
for key in items:
if key not in mapping.get(category, {}):
missing.append((category, key))
# Format report
lines = [f"{self.AI_DISCLAIMER}\n"]
lines.append("## Label Schema Detected\n")
lines.append(
f"Found {len(existing_labels)} existing labels with pattern: `{schema['pattern']}`\n"
)
lines.append("**Detected Categories:**")
for category, labels in schema["categories"].items():
lines.append(f"- **{category.title()}** ({len(labels)} labels)")
lines.append("")
lines.append("**Proposed Mapping:**\n")
lines.append("| OpenRabbit Expected | Your Existing Label | Status |")
lines.append("|---------------------|---------------------|--------|")
for category, items in required_labels.items():
for key in items:
openrabbit_config = self._get_label_config(category, key)
openrabbit_name = openrabbit_config["name"]
if key in mapping.get(category, {}):
existing_name = mapping[category][key]
lines.append(
f"| `{openrabbit_name}` | `{existing_name}` | ✅ Map |"
)
else:
lines.append(f"| `{openrabbit_name}` | *(missing)* | ⚠️ Create |")
lines.append("")
# Create missing labels
if missing:
lines.append(f"**Creating Missing Labels ({len(missing)}):**\n")
created_count = 0
for category, key in missing:
config = self._get_label_config(category, key)
suggested_name = self._suggest_label_name(
category, key, schema["pattern"]
)
# Check if label already exists (case-insensitive)
if suggested_name.lower() not in existing_names:
try:
self.gitea.create_label(
owner,
repo,
suggested_name,
config["color"],
config["description"],
)
lines.append(
f"✅ Created `{suggested_name}` (#{config['color']})"
)
created_count += 1
except Exception as e:
lines.append(f"❌ Failed to create `{suggested_name}`: {e}")
else:
lines.append(f"⚠️ `{suggested_name}` already exists")
lines.append("")
if created_count > 0:
lines.append(f"**✅ Created {created_count} new labels!**")
else:
lines.append("**✅ All Required Labels Present!**")
lines.append("\n**Setup Complete!**")
lines.append("Auto-labeling will use your existing label schema.")
return "\n".join(lines)
def _setup_labels_create_mode(
self, owner: str, repo: str, existing_names: dict
) -> str:
"""Create OpenRabbit default labels."""
lines = [f"{self.AI_DISCLAIMER}\n"]
lines.append("## Creating OpenRabbit Labels\n")
# Get all required labels
required_labels = self._get_required_labels()
created = []
skipped = []
failed = []
for category, items in required_labels.items():
for key in items:
config = self._get_label_config(category, key)
label_name = config["name"]
# Check if already exists (case-insensitive)
if label_name.lower() in existing_names:
skipped.append(label_name)
continue
try:
self.gitea.create_label(
owner, repo, label_name, config["color"], config["description"]
)
created.append((label_name, config["color"]))
except Exception as e:
failed.append((label_name, str(e)))
if created:
lines.append(f"**✅ Created {len(created)} Labels:**\n")
for name, color in created:
lines.append(f"- `{name}` (#{color})")
lines.append("")
if skipped:
lines.append(f"**⚠️ Skipped {len(skipped)} Existing Labels:**\n")
for name in skipped:
lines.append(f"- `{name}`")
lines.append("")
if failed:
lines.append(f"**❌ Failed to Create {len(failed)} Labels:**\n")
for name, error in failed:
lines.append(f"- `{name}`: {error}")
lines.append("")
lines.append("**✅ Setup Complete!**")
lines.append("Auto-labeling is now configured.")
return "\n".join(lines)
def _get_required_labels(self) -> dict:
"""Get all required label categories and keys.
Returns:
{
"type": ["bug", "feature", "question", "docs"],
"priority": ["high", "medium", "low"],
"status": ["ai_approved", "ai_changes_required", "ai_reviewed"]
}
"""
labels_config = self.config.get("labels", {})
required = {}
for category in ["type", "priority", "status"]:
category_config = labels_config.get(category, {})
required[category] = list(category_config.keys())
return required
def _suggest_label_name(self, category: str, key: str, pattern: str) -> str:
"""Suggest a label name based on detected pattern."""
# Get the configured name first
config = self._get_label_config(category, key)
base_name = config["name"]
if pattern == "prefix_slash":
prefix = "Kind" if category == "type" else category.title()
value = key.replace("_", " ").title()
return f"{prefix}/{value}"
elif pattern == "prefix_dash":
prefix = "Kind" if category == "type" else category.title()
value = key.replace("_", " ").title()
return f"{prefix} - {value}"
else: # colon or unknown
return base_name