first commit

This commit is contained in:
2025-12-21 13:42:30 +01:00
parent 823b825acb
commit f9b24fe248
47 changed files with 8222 additions and 1 deletions

View File

@@ -0,0 +1,19 @@
"""AI Review Agents Package
This package contains the modular agent implementations for the
enterprise AI code review system.
"""
from agents.base_agent import BaseAgent, AgentContext, AgentResult
from agents.issue_agent import IssueAgent
from agents.pr_agent import PRAgent
from agents.codebase_agent import CodebaseAgent
__all__ = [
"BaseAgent",
"AgentContext",
"AgentResult",
"IssueAgent",
"PRAgent",
"CodebaseAgent",
]

View File

@@ -0,0 +1,257 @@
"""Base Agent
Abstract base class for all AI agents. Provides common functionality
for Gitea API interaction, LLM calls, logging, and rate limiting.
"""
import logging
import os
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any
import yaml
from clients.gitea_client import GiteaClient
from clients.llm_client import LLMClient, LLMResponse
@dataclass
class AgentContext:
"""Context passed to agent during execution."""
owner: str
repo: str
event_type: str
event_data: dict
config: dict = field(default_factory=dict)
@dataclass
class AgentResult:
"""Result from agent execution."""
success: bool
message: str
data: dict = field(default_factory=dict)
actions_taken: list[str] = field(default_factory=list)
error: str | None = None
class BaseAgent(ABC):
"""Abstract base class for AI agents."""
# Marker to identify AI-generated comments
AI_MARKER = "<!-- AI_CODE_REVIEW -->"
# Disclaimer text
AI_DISCLAIMER = (
"**Note:** This review was generated by an AI assistant. "
"While it aims to be accurate and helpful, it may contain mistakes "
"or miss important issues. Please verify all findings before taking action."
)
def __init__(
self,
config: dict | None = None,
gitea_client: GiteaClient | None = None,
llm_client: LLMClient | None = None,
):
"""Initialize the base agent.
Args:
config: Agent configuration dictionary.
gitea_client: Optional pre-configured Gitea client.
llm_client: Optional pre-configured LLM client.
"""
self.config = config or self._load_config()
self.gitea = gitea_client or GiteaClient()
self.llm = llm_client or LLMClient.from_config(self.config)
self.logger = logging.getLogger(self.__class__.__name__)
# Rate limiting
self._last_request_time = 0.0
self._min_request_interval = 1.0 # seconds
@staticmethod
def _load_config() -> dict:
"""Load configuration from config.yml."""
config_path = os.path.join(os.path.dirname(__file__), "..", "config.yml")
if os.path.exists(config_path):
with open(config_path) as f:
return yaml.safe_load(f)
return {}
def _rate_limit(self):
"""Apply rate limiting between requests."""
elapsed = time.time() - self._last_request_time
if elapsed < self._min_request_interval:
time.sleep(self._min_request_interval - elapsed)
self._last_request_time = time.time()
def load_prompt(self, prompt_name: str) -> str:
"""Load a prompt template from the prompts directory.
Args:
prompt_name: Name of the prompt file (without extension).
Returns:
Prompt template content.
"""
prompt_path = os.path.join(
os.path.dirname(__file__), "..", "prompts", f"{prompt_name}.md"
)
if not os.path.exists(prompt_path):
raise FileNotFoundError(f"Prompt not found: {prompt_path}")
with open(prompt_path) as f:
return f.read()
def call_llm(self, prompt: str, **kwargs) -> LLMResponse:
"""Make a rate-limited call to the LLM.
Args:
prompt: The prompt to send.
**kwargs: Additional LLM options.
Returns:
LLM response.
"""
self._rate_limit()
return self.llm.call(prompt, **kwargs)
def call_llm_json(self, prompt: str, **kwargs) -> dict:
"""Make a rate-limited call and parse JSON response.
Args:
prompt: The prompt to send.
**kwargs: Additional LLM options.
Returns:
Parsed JSON response.
"""
self._rate_limit()
return self.llm.call_json(prompt, **kwargs)
def find_ai_comment(
self,
owner: str,
repo: str,
issue_index: int,
marker: str | None = None,
) -> int | None:
"""Find an existing AI comment by marker.
Args:
owner: Repository owner.
repo: Repository name.
issue_index: Issue or PR number.
marker: Custom marker to search for. Defaults to AI_MARKER.
Returns:
Comment ID if found, None otherwise.
"""
marker = marker or self.AI_MARKER
comments = self.gitea.list_issue_comments(owner, repo, issue_index)
for comment in comments:
if marker in comment.get("body", ""):
return comment["id"]
return None
def upsert_comment(
self,
owner: str,
repo: str,
issue_index: int,
body: str,
marker: str | None = None,
) -> dict:
"""Create or update an AI comment.
Args:
owner: Repository owner.
repo: Repository name.
issue_index: Issue or PR number.
body: Comment body (marker will be prepended if not present).
marker: Custom marker. Defaults to AI_MARKER.
Returns:
Created or updated comment.
"""
marker = marker or self.AI_MARKER
# Ensure marker is in the body
if marker not in body:
body = f"{marker}\n{body}"
# Check for existing comment
existing_id = self.find_ai_comment(owner, repo, issue_index, marker)
if existing_id:
return self.gitea.update_issue_comment(owner, repo, existing_id, body)
else:
return self.gitea.create_issue_comment(owner, repo, issue_index, body)
def format_with_disclaimer(self, content: str) -> str:
"""Add AI disclaimer to content.
Args:
content: The main content.
Returns:
Content with disclaimer prepended.
"""
return f"{self.AI_DISCLAIMER}\n\n{self.AI_MARKER}\n{content}"
@abstractmethod
def execute(self, context: AgentContext) -> AgentResult:
"""Execute the agent's main task.
Args:
context: Execution context with event data.
Returns:
Result of the agent execution.
"""
pass
@abstractmethod
def can_handle(self, event_type: str, event_data: dict) -> bool:
"""Check if this agent can handle the given event.
Args:
event_type: Type of event (issue, pull_request, etc).
event_data: Event payload data.
Returns:
True if this agent can handle the event.
"""
pass
def run(self, context: AgentContext) -> AgentResult:
"""Run the agent with error handling.
Args:
context: Execution context.
Returns:
Agent result, including any errors.
"""
try:
self.logger.info(
f"Running {self.__class__.__name__} for {context.owner}/{context.repo}"
)
result = self.execute(context)
self.logger.info(
f"Completed with success={result.success}: {result.message}"
)
return result
except Exception as e:
self.logger.exception(f"Agent execution failed: {e}")
return AgentResult(
success=False,
message="Agent execution failed",
error=str(e),
)

View File

@@ -0,0 +1,470 @@
"""Chat Agent (Bartender)
Interactive AI chat agent with tool use capabilities.
Can search the codebase and web to answer user questions.
"""
import base64
import logging
import os
import re
from dataclasses import dataclass
import requests
from agents.base_agent import AgentContext, AgentResult, BaseAgent
from clients.llm_client import ToolCall
@dataclass
class ChatMessage:
"""A message in the chat conversation."""
role: str # 'user', 'assistant', or 'tool'
content: str
tool_call_id: str | None = None
name: str | None = None # Tool name for tool responses
class ChatAgent(BaseAgent):
"""Interactive chat agent with tool capabilities."""
# Marker for chat responses
CHAT_AI_MARKER = "<!-- AI_CHAT_RESPONSE -->"
# Tool definitions in OpenAI format
TOOLS = [
{
"type": "function",
"function": {
"name": "search_codebase",
"description": "Search the repository codebase for files, functions, classes, or patterns. Use this to find relevant code.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query - can be a filename, function name, class name, or code pattern",
},
"file_pattern": {
"type": "string",
"description": "Optional file pattern to filter results (e.g., '*.py', 'src/*.js')",
},
},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read the contents of a specific file from the repository.",
"parameters": {
"type": "object",
"properties": {
"filepath": {
"type": "string",
"description": "Path to the file to read",
},
},
"required": ["filepath"],
},
},
},
{
"type": "function",
"function": {
"name": "search_web",
"description": "Search the web for information using SearXNG. Use this for external documentation, tutorials, or general knowledge.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query",
},
"categories": {
"type": "string",
"description": "Optional: comma-separated categories (general, images, videos, news, science, it)",
},
},
"required": ["query"],
},
},
},
]
# System prompt for the chat agent
SYSTEM_PROMPT = """You are Bartender, a helpful AI assistant for code review and development tasks.
You have access to tools to help answer questions:
- search_codebase: Search the repository for code, files, functions, or patterns
- read_file: Read specific files from the repository
- search_web: Search the web for documentation, tutorials, or external information
When helping users:
1. Use tools to gather information before answering questions about code
2. Be concise but thorough in your explanations
3. Provide code examples when helpful
4. If you're unsure, say so and suggest alternatives
Repository context: {owner}/{repo}
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._searxng_url = self.config.get("agents", {}).get("chat", {}).get(
"searxng_url", os.environ.get("SEARXNG_URL", "")
)
def can_handle(self, event_type: str, event_data: dict) -> bool:
"""Check if this agent handles the given event."""
agent_config = self.config.get("agents", {}).get("chat", {})
if not agent_config.get("enabled", True):
return False
# Handle issue comment with @ai-bot chat or just @ai-bot
if event_type == "issue_comment":
comment_body = event_data.get("comment", {}).get("body", "")
mention_prefix = self.config.get("interaction", {}).get(
"mention_prefix", "@ai-bot"
)
# Check if this is a chat request (any @ai-bot mention that isn't a specific command)
if mention_prefix in comment_body:
# Check it's not another specific command
specific_commands = ["summarize", "explain", "suggest", "security", "codebase"]
body_lower = comment_body.lower()
for cmd in specific_commands:
if f"{mention_prefix} {cmd}" in body_lower:
return False
return True
# Handle direct chat command
if event_type == "chat":
return True
return False
def execute(self, context: AgentContext) -> AgentResult:
"""Execute the chat agent."""
self.logger.info(f"Starting chat for {context.owner}/{context.repo}")
# Extract user message
if context.event_type == "issue_comment":
user_message = context.event_data.get("comment", {}).get("body", "")
issue_index = context.event_data.get("issue", {}).get("number")
# Remove the @ai-bot prefix
mention_prefix = self.config.get("interaction", {}).get(
"mention_prefix", "@ai-bot"
)
user_message = user_message.replace(mention_prefix, "").strip()
else:
user_message = context.event_data.get("message", "")
issue_index = context.event_data.get("issue_number")
if not user_message:
return AgentResult(
success=False,
message="No message provided",
)
# Build conversation
system_prompt = self.SYSTEM_PROMPT.format(
owner=context.owner,
repo=context.repo,
)
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message},
]
# Run the chat loop with tool execution
response_content, tools_used = self._run_chat_loop(
context, messages, max_iterations=5
)
actions_taken = []
if tools_used:
actions_taken.append(f"Used tools: {', '.join(tools_used)}")
# Post response if this is an issue comment
if issue_index:
comment_body = self._format_response(response_content)
self.upsert_comment(
context.owner,
context.repo,
issue_index,
comment_body,
marker=self.CHAT_AI_MARKER,
)
actions_taken.append("Posted chat response")
return AgentResult(
success=True,
message="Chat completed",
data={"response": response_content, "tools_used": tools_used},
actions_taken=actions_taken,
)
def _run_chat_loop(
self,
context: AgentContext,
messages: list[dict],
max_iterations: int = 5,
) -> tuple[str, list[str]]:
"""Run the chat loop with tool execution.
Returns:
Tuple of (final response content, list of tools used)
"""
tools_used = []
for _ in range(max_iterations):
self._rate_limit()
response = self.llm.call_with_tools(messages, tools=self.TOOLS)
# If no tool calls, we're done
if not response.tool_calls:
return response.content, tools_used
# Add assistant message with tool calls
messages.append({
"role": "assistant",
"content": response.content or "",
"tool_calls": [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.name,
"arguments": str(tc.arguments),
},
}
for tc in response.tool_calls
],
})
# Execute each tool call
for tool_call in response.tool_calls:
tool_result = self._execute_tool(context, tool_call)
tools_used.append(tool_call.name)
# Add tool result to messages
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": tool_result,
})
# If we hit max iterations, make one final call without tools
self._rate_limit()
final_response = self.llm.call_with_tools(
messages, tools=None, tool_choice="none"
)
return final_response.content, tools_used
def _execute_tool(self, context: AgentContext, tool_call: ToolCall) -> str:
"""Execute a tool call and return the result."""
self.logger.info(f"Executing tool: {tool_call.name}")
try:
if tool_call.name == "search_codebase":
return self._tool_search_codebase(
context,
tool_call.arguments.get("query", ""),
tool_call.arguments.get("file_pattern"),
)
elif tool_call.name == "read_file":
return self._tool_read_file(
context,
tool_call.arguments.get("filepath", ""),
)
elif tool_call.name == "search_web":
return self._tool_search_web(
tool_call.arguments.get("query", ""),
tool_call.arguments.get("categories"),
)
else:
return f"Unknown tool: {tool_call.name}"
except Exception as e:
self.logger.error(f"Tool execution failed: {e}")
return f"Error executing tool: {e}"
def _tool_search_codebase(
self,
context: AgentContext,
query: str,
file_pattern: str | None = None,
) -> str:
"""Search the codebase for files matching a query."""
results = []
# Get repository file list
try:
files = self._collect_files(context.owner, context.repo, file_pattern)
except Exception as e:
return f"Error listing files: {e}"
query_lower = query.lower()
# Search through files
for file_info in files[:50]: # Limit to prevent API exhaustion
filepath = file_info.get("path", "")
# Check filename match
if query_lower in filepath.lower():
results.append(f"File: {filepath}")
continue
# Check content for code patterns
try:
content_data = self.gitea.get_file_contents(
context.owner, context.repo, filepath
)
if content_data.get("content"):
content = base64.b64decode(content_data["content"]).decode(
"utf-8", errors="ignore"
)
# Search for query in content
lines = content.splitlines()
matching_lines = []
for i, line in enumerate(lines, 1):
if query_lower in line.lower():
matching_lines.append(f" L{i}: {line.strip()[:100]}")
if matching_lines:
results.append(f"File: {filepath}")
results.extend(matching_lines[:5]) # Max 5 matches per file
except Exception:
pass
if not results:
return f"No results found for '{query}'"
return "\n".join(results[:30]) # Limit total results
def _collect_files(
self,
owner: str,
repo: str,
file_pattern: str | None = None,
) -> list[dict]:
"""Collect files from the repository."""
files = []
# Code extensions to search
code_extensions = {
".py", ".js", ".ts", ".go", ".rs", ".java", ".rb",
".php", ".c", ".cpp", ".h", ".cs", ".swift", ".kt",
".md", ".yml", ".yaml", ".json", ".toml",
}
# Patterns to ignore
ignore_patterns = [
"node_modules/", "vendor/", ".git/", "__pycache__/",
".venv/", "dist/", "build/", ".min.js", ".min.css",
]
def traverse(path: str = ""):
try:
contents = self.gitea.get_file_contents(owner, repo, path or ".")
if isinstance(contents, list):
for item in contents:
item_path = item.get("path", "")
if any(p in item_path for p in ignore_patterns):
continue
if item.get("type") == "file":
ext = os.path.splitext(item_path)[1]
if ext in code_extensions:
# Check file pattern if provided
if file_pattern:
if not self._match_pattern(item_path, file_pattern):
continue
files.append(item)
elif item.get("type") == "dir":
traverse(item_path)
except Exception as e:
self.logger.warning(f"Failed to list {path}: {e}")
traverse()
return files[:100] # Limit to prevent API exhaustion
def _match_pattern(self, filepath: str, pattern: str) -> bool:
"""Check if filepath matches a simple glob pattern."""
import fnmatch
return fnmatch.fnmatch(filepath, pattern)
def _tool_read_file(self, context: AgentContext, filepath: str) -> str:
"""Read a file from the repository."""
try:
content_data = self.gitea.get_file_contents(
context.owner, context.repo, filepath
)
if content_data.get("content"):
content = base64.b64decode(content_data["content"]).decode(
"utf-8", errors="ignore"
)
# Truncate if too long
if len(content) > 8000:
content = content[:8000] + "\n... (truncated)"
return f"File: {filepath}\n\n```\n{content}\n```"
return f"File not found: {filepath}"
except Exception as e:
return f"Error reading file: {e}"
def _tool_search_web(
self,
query: str,
categories: str | None = None,
) -> str:
"""Search the web using SearXNG."""
if not self._searxng_url:
return "Web search is not configured. Set SEARXNG_URL environment variable."
try:
params = {
"q": query,
"format": "json",
}
if categories:
params["categories"] = categories
response = requests.get(
f"{self._searxng_url}/search",
params=params,
timeout=30,
)
response.raise_for_status()
data = response.json()
results = data.get("results", [])
if not results:
return f"No web results found for '{query}'"
# Format results
output = []
for i, result in enumerate(results[:5], 1): # Top 5 results
title = result.get("title", "No title")
url = result.get("url", "")
content = result.get("content", "")[:200]
output.append(f"{i}. **{title}**\n {url}\n {content}")
return "\n\n".join(output)
except requests.exceptions.RequestException as e:
return f"Web search failed: {e}"
def _format_response(self, content: str) -> str:
"""Format the chat response with disclaimer."""
lines = [
f"{self.AI_DISCLAIMER}",
"",
"---",
"",
content,
]
return "\n".join(lines)

View File

@@ -0,0 +1,457 @@
"""Codebase Quality Agent
AI agent for analyzing overall codebase health, architecture,
technical debt, and documentation coverage.
"""
import base64
import os
from dataclasses import dataclass, field
from agents.base_agent import AgentContext, AgentResult, BaseAgent
@dataclass
class CodebaseMetrics:
"""Metrics collected from codebase analysis."""
total_files: int = 0
total_lines: int = 0
languages: dict = field(default_factory=dict)
todo_count: int = 0
fixme_count: int = 0
deprecated_count: int = 0
missing_docstrings: int = 0
@dataclass
class CodebaseReport:
"""Complete codebase analysis report."""
summary: str
health_score: float # 0-100
metrics: CodebaseMetrics
issues: list[dict]
recommendations: list[str]
architecture_notes: list[str]
class CodebaseAgent(BaseAgent):
"""Agent for codebase quality analysis."""
# Marker for codebase reports
CODEBASE_AI_MARKER = "<!-- AI_CODEBASE_REVIEW -->"
# File extensions to analyze
CODE_EXTENSIONS = {
".py": "Python",
".js": "JavaScript",
".ts": "TypeScript",
".go": "Go",
".rs": "Rust",
".java": "Java",
".rb": "Ruby",
".php": "PHP",
".c": "C",
".cpp": "C++",
".h": "C/C++ Header",
".cs": "C#",
".swift": "Swift",
".kt": "Kotlin",
}
# Files to ignore
IGNORE_PATTERNS = [
"node_modules/",
"vendor/",
".git/",
"__pycache__/",
".venv/",
"dist/",
"build/",
".min.js",
".min.css",
]
def can_handle(self, event_type: str, event_data: dict) -> bool:
"""Check if this agent handles the given event."""
agent_config = self.config.get("agents", {}).get("codebase", {})
if not agent_config.get("enabled", True):
return False
# Handle manual trigger via workflow_dispatch or schedule
if event_type in ("workflow_dispatch", "schedule"):
return True
# Handle special issue command
if event_type == "issue_comment":
comment_body = event_data.get("comment", {}).get("body", "")
mention_prefix = self.config.get("interaction", {}).get(
"mention_prefix", "@ai-bot"
)
if f"{mention_prefix} codebase" in comment_body.lower():
return True
return False
def execute(self, context: AgentContext) -> AgentResult:
"""Execute codebase analysis."""
self.logger.info(f"Starting codebase analysis for {context.owner}/{context.repo}")
actions_taken = []
# Step 1: Collect file list from repository
files = self._collect_files(context.owner, context.repo)
self.logger.info(f"Found {len(files)} files to analyze")
# Step 2: Analyze metrics
metrics = self._analyze_metrics(context.owner, context.repo, files)
actions_taken.append(f"Analyzed {metrics.total_files} files")
# Step 3: Run AI analysis on key files
report = self._run_ai_analysis(context, files, metrics)
actions_taken.append("Generated AI analysis report")
# Step 4: Create or update report issue
issue_number = self._create_report_issue(context, report)
actions_taken.append(f"Created/updated report issue #{issue_number}")
return AgentResult(
success=True,
message=f"Codebase analysis complete - Health Score: {report.health_score:.0f}/100",
data={
"health_score": report.health_score,
"total_files": metrics.total_files,
"issues_found": len(report.issues),
},
actions_taken=actions_taken,
)
def _collect_files(self, owner: str, repo: str) -> list[dict]:
"""Collect list of files from the repository."""
files = []
def traverse(path: str = ""):
try:
contents = self.gitea.get_file_contents(owner, repo, path or ".")
if isinstance(contents, list):
for item in contents:
item_path = item.get("path", "")
# Skip ignored patterns
if any(p in item_path for p in self.IGNORE_PATTERNS):
continue
if item.get("type") == "file":
ext = os.path.splitext(item_path)[1]
if ext in self.CODE_EXTENSIONS:
files.append(item)
elif item.get("type") == "dir":
traverse(item_path)
except Exception as e:
self.logger.warning(f"Failed to list {path}: {e}")
traverse()
return files[:100] # Limit to prevent API exhaustion
def _analyze_metrics(
self,
owner: str,
repo: str,
files: list[dict],
) -> CodebaseMetrics:
"""Analyze metrics from files."""
metrics = CodebaseMetrics()
metrics.total_files = len(files)
for file_info in files[:50]: # Analyze top 50 files
filepath = file_info.get("path", "")
ext = os.path.splitext(filepath)[1]
lang = self.CODE_EXTENSIONS.get(ext, "Unknown")
metrics.languages[lang] = metrics.languages.get(lang, 0) + 1
try:
content_data = self.gitea.get_file_contents(owner, repo, filepath)
if content_data.get("content"):
content = base64.b64decode(content_data["content"]).decode(
"utf-8", errors="ignore"
)
lines = content.splitlines()
metrics.total_lines += len(lines)
# Count markers
for line in lines:
line_upper = line.upper()
if "TODO" in line_upper:
metrics.todo_count += 1
if "FIXME" in line_upper:
metrics.fixme_count += 1
if "DEPRECATED" in line_upper:
metrics.deprecated_count += 1
# Check for docstrings (Python)
if ext == ".py":
if 'def ' in content and '"""' not in content:
metrics.missing_docstrings += 1
except Exception as e:
self.logger.debug(f"Could not analyze {filepath}: {e}")
return metrics
def _run_ai_analysis(
self,
context: AgentContext,
files: list[dict],
metrics: CodebaseMetrics,
) -> CodebaseReport:
"""Run AI analysis on the codebase."""
# Prepare context for AI
file_list = "\n".join([f"- {f.get('path', '')}" for f in files[:30]])
language_breakdown = "\n".join(
[f"- {lang}: {count} files" for lang, count in metrics.languages.items()]
)
# Sample some key files for deeper analysis
key_files_content = self._get_key_files_content(
context.owner, context.repo, files
)
prompt = f"""Analyze this codebase and provide a comprehensive quality assessment.
## Repository: {context.owner}/{context.repo}
## Metrics
- Total Files: {metrics.total_files}
- Total Lines: {metrics.total_lines}
- TODO Comments: {metrics.todo_count}
- FIXME Comments: {metrics.fixme_count}
- Deprecated Markers: {metrics.deprecated_count}
## Language Breakdown
{language_breakdown}
## File Structure (sample)
{file_list}
## Key Files Content
{key_files_content}
## Analysis Required
Provide your analysis as JSON with this structure:
```json
{{
"summary": "Overall assessment in 2-3 sentences",
"health_score": 0-100,
"issues": [
{{
"severity": "HIGH|MEDIUM|LOW",
"category": "Architecture|Code Quality|Security|Testing|Documentation",
"description": "Issue description",
"recommendation": "How to fix"
}}
],
"recommendations": ["Top 3-5 actionable recommendations"],
"architecture_notes": ["Observations about code structure and patterns"]
}}
```
Be constructive and actionable. Focus on the most impactful improvements.
"""
try:
result = self.call_llm_json(prompt)
return CodebaseReport(
summary=result.get("summary", "Analysis complete"),
health_score=float(result.get("health_score", 50)),
metrics=metrics,
issues=result.get("issues", []),
recommendations=result.get("recommendations", []),
architecture_notes=result.get("architecture_notes", []),
)
except Exception as e:
self.logger.error(f"AI analysis failed: {e}")
# Try to log the raw response if possible (requires accessing the last response)
# Since we don't have direct access here, we rely on having good logging in LLMClient if needed.
# But let's add a note to the summary.
# Calculate basic health score from metrics
health_score = 70
if metrics.todo_count > 10:
health_score -= 10
if metrics.fixme_count > 5:
health_score -= 10
return CodebaseReport(
summary=f"Basic analysis complete (AI unavailable: {e})",
health_score=health_score,
metrics=metrics,
issues=[],
recommendations=["Manual review recommended"],
architecture_notes=[],
)
def _get_key_files_content(
self,
owner: str,
repo: str,
files: list[dict],
) -> str:
"""Get content of key files for AI analysis."""
key_file_names = [
"README.md",
"setup.py",
"pyproject.toml",
"package.json",
"Cargo.toml",
"go.mod",
"Makefile",
"Dockerfile",
]
content_parts = []
for file_info in files:
filepath = file_info.get("path", "")
filename = os.path.basename(filepath)
if filename in key_file_names:
try:
content_data = self.gitea.get_file_contents(owner, repo, filepath)
if content_data.get("content"):
content = base64.b64decode(content_data["content"]).decode(
"utf-8", errors="ignore"
)
# Truncate long files
if len(content) > 2000:
content = content[:2000] + "\n... (truncated)"
content_parts.append(f"### {filepath}\n```\n{content}\n```")
except Exception:
pass
return "\n\n".join(content_parts[:5]) or "No key configuration files found."
def _create_report_issue(
self,
context: AgentContext,
report: CodebaseReport,
) -> int:
"""Create or update a report issue."""
# Generate issue body
body = self._generate_report_body(report)
# Look for existing report issue
try:
issues = self.gitea.list_issues(
context.owner, context.repo, state="open", labels=["ai-codebase-report"]
)
for issue in issues:
if self.CODEBASE_AI_MARKER in issue.get("body", ""):
# Update existing issue body
self.gitea.update_issue(
context.owner,
context.repo,
issue["number"],
body=body,
)
return issue["number"]
except Exception as e:
self.logger.warning(f"Failed to check for existing report: {e}")
# Create new issue
try:
# Check for label ID
labels = []
try:
repo_labels = self.gitea.get_repo_labels(context.owner, context.repo)
for label in repo_labels:
if label["name"] == "ai-codebase-report":
labels.append(label["id"])
break
except Exception:
pass
issue = self.gitea.create_issue(
context.owner,
context.repo,
title=f"AI Codebase Report - {context.repo}",
body=body,
labels=labels,
)
return issue["number"]
except Exception as e:
self.logger.error(f"Failed to create report issue: {e}")
return 0
def _generate_report_body(self, report: CodebaseReport) -> str:
"""Generate the report issue body."""
health_emoji = "🟢" if report.health_score >= 80 else ("🟡" if report.health_score >= 60 else "🔴")
lines = [
f"{self.AI_DISCLAIMER}",
"",
"# AI Codebase Quality Report",
"",
f"## Health Score: {report.health_score:.0f}/100",
"",
report.summary,
"",
"---",
"",
"## Metrics",
"",
"| Metric | Value |",
"|--------|-------|",
f"| Total Files | {report.metrics.total_files} |",
f"| Total Lines | {report.metrics.total_lines:,} |",
f"| TODO Comments | {report.metrics.todo_count} |",
f"| FIXME Comments | {report.metrics.fixme_count} |",
f"| Deprecated | {report.metrics.deprecated_count} |",
"",
]
# Languages
if report.metrics.languages:
lines.append("### Languages")
lines.append("")
for lang, count in sorted(
report.metrics.languages.items(), key=lambda x: -x[1]
):
lines.append(f"- **{lang}**: {count} files")
lines.append("")
# Issues
if report.issues:
lines.append("## Issues Found")
lines.append("")
for issue in report.issues[:10]:
severity = issue.get("severity", "MEDIUM")
emoji = "🔴" if severity == "HIGH" else ("🟡" if severity == "MEDIUM" else "🟢")
lines.append(f"### [{severity}] {issue.get('category', 'General')}")
lines.append("")
lines.append(issue.get("description", ""))
lines.append("")
lines.append(f"**Recommendation:** {issue.get('recommendation', '')}")
lines.append("")
# Recommendations
if report.recommendations:
lines.append("## Recommendations")
lines.append("")
for i, rec in enumerate(report.recommendations[:5], 1):
lines.append(f"{i}. {rec}")
lines.append("")
# Architecture notes
if report.architecture_notes:
lines.append("## Architecture Notes")
lines.append("")
for note in report.architecture_notes[:5]:
lines.append(f"- {note}")
lines.append("")
lines.append("---")
lines.append(f"*Generated by AI Codebase Agent*")
return "\n".join(lines)

View File

@@ -0,0 +1,392 @@
"""Issue Review Agent
AI agent for triaging, labeling, and responding to issues.
Handles issue.opened, issue.labeled, and issue_comment events.
"""
import logging
from dataclasses import dataclass
from agents.base_agent import AgentContext, AgentResult, BaseAgent
@dataclass
class TriageResult:
"""Result of issue triage analysis."""
issue_type: str
priority: str
confidence: float
summary: str
suggested_labels: list[str]
is_duplicate: bool
duplicate_of: int | None
needs_more_info: bool
missing_info: list[str]
components: list[str]
reasoning: str
class IssueAgent(BaseAgent):
"""Agent for handling issue events."""
# Marker specific to issue comments
ISSUE_AI_MARKER = "<!-- AI_ISSUE_TRIAGE -->"
def can_handle(self, event_type: str, event_data: dict) -> bool:
"""Check if this agent handles the given event."""
# Check if agent is enabled
agent_config = self.config.get("agents", {}).get("issue", {})
if not agent_config.get("enabled", True):
return False
# Handle issue events
if event_type == "issues":
action = event_data.get("action", "")
allowed_events = agent_config.get("events", ["opened", "labeled"])
if action not in allowed_events:
return False
# Ignore our own codebase reports to prevent double-commenting
issue = event_data.get("issue", {})
title = issue.get("title", "")
labels = [l.get("name") for l in issue.get("labels", [])]
if "AI Codebase Report" in title or "ai-codebase-report" in labels:
return False
return True
# Handle issue comment events (for @mentions)
if event_type == "issue_comment":
action = event_data.get("action", "")
if action == "created":
comment_body = event_data.get("comment", {}).get("body", "")
mention_prefix = self.config.get("interaction", {}).get(
"mention_prefix", "@ai-bot"
)
return mention_prefix in comment_body
return False
def execute(self, context: AgentContext) -> AgentResult:
"""Execute the issue agent."""
event_data = context.event_data
action = event_data.get("action", "")
if context.event_type == "issues":
if action == "opened":
return self._handle_issue_opened(context)
elif action == "labeled":
return self._handle_issue_labeled(context)
if context.event_type == "issue_comment":
return self._handle_issue_comment(context)
return AgentResult(
success=False,
message=f"Unknown action: {action}",
)
def _handle_issue_opened(self, context: AgentContext) -> AgentResult:
"""Handle a newly opened issue."""
issue = context.event_data.get("issue", {})
issue_index = issue.get("number")
title = issue.get("title", "")
body = issue.get("body", "")
author = issue.get("user", {}).get("login", "unknown")
existing_labels = [l.get("name", "") for l in issue.get("labels", [])]
self.logger.info(f"Triaging issue #{issue_index}: {title}")
# Step 1: Triage the issue
triage = self._triage_issue(title, body, author, existing_labels)
actions_taken = []
# Step 2: Apply labels if auto-label is enabled
agent_config = self.config.get("agents", {}).get("issue", {})
if agent_config.get("auto_label", True):
labels_applied = self._apply_labels(
context.owner, context.repo, issue_index, triage
)
if labels_applied:
actions_taken.append(f"Applied labels: {labels_applied}")
# Step 3: Post triage comment
comment = self._generate_triage_comment(triage, issue)
self.upsert_comment(
context.owner,
context.repo,
issue_index,
comment,
marker=self.ISSUE_AI_MARKER,
)
actions_taken.append("Posted triage comment")
return AgentResult(
success=True,
message=f"Triaged issue #{issue_index} as {triage.issue_type} ({triage.priority} priority)",
data={
"triage": {
"type": triage.issue_type,
"priority": triage.priority,
"confidence": triage.confidence,
}
},
actions_taken=actions_taken,
)
def _handle_issue_labeled(self, context: AgentContext) -> AgentResult:
"""Handle label addition to an issue."""
# Could be used for specific label-triggered actions
issue = context.event_data.get("issue", {})
label = context.event_data.get("label", {})
return AgentResult(
success=True,
message=f"Noted label '{label.get('name')}' added to issue #{issue.get('number')}",
)
def _handle_issue_comment(self, context: AgentContext) -> AgentResult:
"""Handle @mention in issue comment."""
issue = context.event_data.get("issue", {})
comment = context.event_data.get("comment", {})
issue_index = issue.get("number")
comment_body = comment.get("body", "")
# Parse command from mention
command = self._parse_command(comment_body)
if command:
response = self._handle_command(context, issue, command)
self.gitea.create_issue_comment(
context.owner, context.repo, issue_index, response
)
return AgentResult(
success=True,
message=f"Responded to command: {command}",
actions_taken=["Posted command response"],
)
return AgentResult(
success=True,
message="No actionable command found in mention",
)
def _triage_issue(
self,
title: str,
body: str,
author: str,
existing_labels: list[str],
) -> TriageResult:
"""Use LLM to triage the issue."""
prompt_template = self.load_prompt("issue_triage")
prompt = prompt_template.format(
title=title,
body=body or "(no description provided)",
author=author,
existing_labels=", ".join(existing_labels) if existing_labels else "none",
)
try:
result = self.call_llm_json(prompt)
return TriageResult(
issue_type=result.get("type", "question"),
priority=result.get("priority", "medium"),
confidence=result.get("confidence", 0.5),
summary=result.get("summary", title),
suggested_labels=result.get("suggested_labels", []),
is_duplicate=result.get("is_duplicate", False),
duplicate_of=result.get("duplicate_of"),
needs_more_info=result.get("needs_more_info", False),
missing_info=result.get("missing_info", []),
components=result.get("components", []),
reasoning=result.get("reasoning", ""),
)
except Exception as e:
self.logger.warning(f"LLM triage failed: {e}")
# Return default triage on failure
return TriageResult(
issue_type="question",
priority="medium",
confidence=0.3,
summary=title,
suggested_labels=[],
is_duplicate=False,
duplicate_of=None,
needs_more_info=True,
missing_info=["Unable to parse issue automatically"],
components=[],
reasoning="Automatic triage failed, needs human review",
)
def _apply_labels(
self,
owner: str,
repo: str,
issue_index: int,
triage: TriageResult,
) -> list[str]:
"""Apply labels based on triage result."""
labels_config = self.config.get("labels", {})
# Get all repo labels
try:
repo_labels = self.gitea.get_repo_labels(owner, repo)
label_map = {l["name"]: l["id"] for l in repo_labels}
except Exception as e:
self.logger.warning(f"Failed to get repo labels: {e}")
return []
labels_to_add = []
# Map priority
priority_labels = labels_config.get("priority", {})
priority_label = priority_labels.get(triage.priority)
if priority_label and priority_label in label_map:
labels_to_add.append(label_map[priority_label])
# Map type
type_labels = labels_config.get("type", {})
type_label = type_labels.get(triage.issue_type)
if type_label and type_label in label_map:
labels_to_add.append(label_map[type_label])
# Add AI reviewed label
status_labels = labels_config.get("status", {})
reviewed_label = status_labels.get("ai_reviewed")
if reviewed_label and reviewed_label in label_map:
labels_to_add.append(label_map[reviewed_label])
if labels_to_add:
try:
self.gitea.add_issue_labels(owner, repo, issue_index, labels_to_add)
return [
name for name, id in label_map.items() if id in labels_to_add
]
except Exception as e:
self.logger.warning(f"Failed to add labels: {e}")
return []
def _generate_triage_comment(self, triage: TriageResult, issue: dict) -> str:
"""Generate a triage summary comment."""
lines = [
f"{self.AI_DISCLAIMER}",
"",
"## AI Issue Triage",
"",
f"| Field | Value |",
f"|-------|--------|",
f"| **Type** | {triage.issue_type.capitalize()} |",
f"| **Priority** | {triage.priority.capitalize()} |",
f"| **Confidence** | {triage.confidence:.0%} |",
"",
]
if triage.summary != issue.get("title"):
lines.append(f"**Summary:** {triage.summary}")
lines.append("")
if triage.components:
lines.append(f"**Components:** {', '.join(triage.components)}")
lines.append("")
if triage.needs_more_info and triage.missing_info:
lines.append("### Additional Information Needed")
lines.append("")
for info in triage.missing_info:
lines.append(f"- {info}")
lines.append("")
if triage.is_duplicate and triage.duplicate_of:
lines.append(f"### Possible Duplicate")
lines.append(f"This issue may be a duplicate of #{triage.duplicate_of}")
lines.append("")
lines.append("---")
lines.append(f"*{triage.reasoning}*")
return "\n".join(lines)
def _parse_command(self, body: str) -> str | None:
"""Parse a command from a comment body."""
mention_prefix = self.config.get("interaction", {}).get(
"mention_prefix", "@ai-bot"
)
commands = self.config.get("interaction", {}).get(
"commands", ["explain", "suggest", "security", "summarize"]
)
for command in commands:
if f"{mention_prefix} {command}" in body.lower():
return command
return None
def _handle_command(self, context: AgentContext, issue: dict, command: str) -> str:
"""Handle a command from an @mention."""
title = issue.get("title", "")
body = issue.get("body", "")
if command == "summarize":
return self._command_summarize(title, body)
elif command == "explain":
return self._command_explain(title, body)
elif command == "suggest":
return self._command_suggest(title, body)
return f"{self.AI_DISCLAIMER}\n\nSorry, I don't understand the command `{command}`."
def _command_summarize(self, title: str, body: str) -> str:
"""Generate a summary of the issue."""
prompt = f"""Summarize the following issue in 2-3 concise sentences:
Title: {title}
Body: {body}
Provide only the summary, no additional formatting."""
try:
response = self.call_llm(prompt)
return f"{self.AI_DISCLAIMER}\n\n**Summary:**\n{response.content}"
except Exception as e:
return f"{self.AI_DISCLAIMER}\n\nSorry, I was unable to generate a summary. Error: {e}"
def _command_explain(self, title: str, body: str) -> str:
"""Explain the issue in more detail."""
prompt = f"""Analyze this issue and provide a clear explanation of what the user is asking for or reporting:
Title: {title}
Body: {body}
Provide:
1. What the issue is about
2. What the user expects
3. Any technical context that might be relevant
Be concise and helpful."""
try:
response = self.call_llm(prompt)
return f"{self.AI_DISCLAIMER}\n\n**Explanation:**\n{response.content}"
except Exception as e:
return f"{self.AI_DISCLAIMER}\n\nSorry, I was unable to explain this issue. Error: {e}"
def _command_suggest(self, title: str, body: str) -> str:
"""Suggest solutions for the issue."""
prompt = f"""Based on this issue, suggest potential solutions or next steps:
Title: {title}
Body: {body}
Provide 2-3 actionable suggestions. If this is a bug, suggest debugging steps. If this is a feature request, suggest implementation approaches.
Be practical and concise."""
try:
response = self.call_llm(prompt)
return f"{self.AI_DISCLAIMER}\n\n**Suggestions:**\n{response.content}"
except Exception as e:
return f"{self.AI_DISCLAIMER}\n\nSorry, I was unable to generate suggestions. Error: {e}"

View File

@@ -0,0 +1,436 @@
"""Pull Request Review Agent
Enhanced AI agent for comprehensive PR reviews with inline comments,
security scanning, and automatic label management.
"""
import re
from dataclasses import dataclass, field
from agents.base_agent import AgentContext, AgentResult, BaseAgent
@dataclass
class ReviewIssue:
"""A single issue found in the PR."""
file: str
line: int | None
severity: str # HIGH, MEDIUM, LOW
category: str # Security, Correctness, Performance, etc.
description: str
recommendation: str
code_snippet: str | None = None
@dataclass
class PRReviewResult:
"""Result of a PR review."""
summary: str
issues: list[ReviewIssue]
overall_severity: str
approval: bool
security_issues: list[ReviewIssue] = field(default_factory=list)
class PRAgent(BaseAgent):
"""Agent for handling pull request reviews."""
# Marker specific to PR reviews
PR_AI_MARKER = "<!-- AI_PR_REVIEW -->"
def can_handle(self, event_type: str, event_data: dict) -> bool:
"""Check if this agent handles the given event."""
# Check if agent is enabled
agent_config = self.config.get("agents", {}).get("pr", {})
if not agent_config.get("enabled", True):
return False
if event_type == "pull_request":
action = event_data.get("action", "")
allowed_events = agent_config.get("events", ["opened", "synchronize"])
return action in allowed_events
return False
def execute(self, context: AgentContext) -> AgentResult:
"""Execute the PR review agent."""
pr = context.event_data.get("pull_request", {})
pr_number = pr.get("number")
self.logger.info(f"Reviewing PR #{pr_number}: {pr.get('title')}")
actions_taken = []
# Step 1: Get PR diff
diff = self._get_diff(context.owner, context.repo, pr_number)
if not diff.strip():
return AgentResult(
success=True,
message="PR has no changes to review",
)
# Step 2: Parse changed files
changed_files = self._parse_diff_files(diff)
# Step 3: Run security scan if enabled
security_issues = []
agent_config = self.config.get("agents", {}).get("pr", {})
if agent_config.get("security_scan", True):
security_issues = self._run_security_scan(changed_files, diff)
if security_issues:
actions_taken.append(f"Found {len(security_issues)} security issues")
# Step 4: Run AI review
review_result = self._run_ai_review(diff, context, security_issues)
# Step 5: Post inline comments if enabled
if agent_config.get("inline_comments", True) and review_result.issues:
inline_count = self._post_inline_comments(
context.owner, context.repo, pr_number, review_result
)
actions_taken.append(f"Posted {inline_count} inline comments")
# Step 6: Post summary comment
summary_comment = self._generate_summary_comment(review_result)
self.upsert_comment(
context.owner,
context.repo,
pr_number,
summary_comment,
marker=self.PR_AI_MARKER,
)
actions_taken.append("Posted summary comment")
# Step 7: Apply labels
labels_applied = self._apply_review_labels(
context.owner, context.repo, pr_number, review_result
)
if labels_applied:
actions_taken.append(f"Applied labels: {labels_applied}")
return AgentResult(
success=True,
message=f"Reviewed PR #{pr_number}: {review_result.overall_severity} severity",
data={
"severity": review_result.overall_severity,
"approval": review_result.approval,
"issues_count": len(review_result.issues),
"security_issues_count": len(review_result.security_issues),
},
actions_taken=actions_taken,
)
def _get_diff(self, owner: str, repo: str, pr_number: int) -> str:
"""Get the PR diff, truncated if necessary."""
max_lines = self.config.get("review", {}).get("max_diff_lines", 800)
try:
diff = self.gitea.get_pull_request_diff(owner, repo, pr_number)
lines = diff.splitlines()
if len(lines) > max_lines:
return "\n".join(lines[:max_lines])
return diff
except Exception as e:
self.logger.error(f"Failed to get diff: {e}")
return ""
def _parse_diff_files(self, diff: str) -> dict[str, str]:
"""Parse diff into file -> content mapping."""
files = {}
current_file = None
current_content = []
for line in diff.splitlines():
if line.startswith("diff --git"):
if current_file:
files[current_file] = "\n".join(current_content)
# Extract file path from "diff --git a/path b/path"
match = re.search(r"b/(.+)$", line)
if match:
current_file = match.group(1)
current_content = []
elif current_file:
current_content.append(line)
if current_file:
files[current_file] = "\n".join(current_content)
return files
def _run_security_scan(
self, changed_files: dict[str, str], diff: str
) -> list[ReviewIssue]:
"""Run security pattern scanning on the diff."""
issues = []
# Security patterns to detect
patterns = [
{
"name": "Hardcoded Secrets",
"pattern": r'(?i)(api_key|apikey|secret|password|token|auth)\s*[=:]\s*["\'][^"\']{8,}["\']',
"severity": "HIGH",
"category": "Security",
"description": "Potential hardcoded secret or API key detected",
"recommendation": "Move secrets to environment variables or a secrets manager",
},
{
"name": "SQL Injection",
"pattern": r'(?i)(execute|query)\s*\([^)]*\+[^)]*\)|f["\'].*\{.*\}.*(?:SELECT|INSERT|UPDATE|DELETE)',
"severity": "HIGH",
"category": "Security",
"description": "Potential SQL injection vulnerability - string concatenation in query",
"recommendation": "Use parameterized queries or prepared statements",
},
{
"name": "Hardcoded IP",
"pattern": r'\b(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b',
"severity": "LOW",
"category": "Security",
"description": "Hardcoded IP address detected",
"recommendation": "Consider using configuration or DNS names instead",
},
{
"name": "Eval Usage",
"pattern": r'\beval\s*\(',
"severity": "HIGH",
"category": "Security",
"description": "Use of eval() detected - potential code injection risk",
"recommendation": "Avoid eval() - use safer alternatives like ast.literal_eval() for Python",
},
{
"name": "Shell Injection",
"pattern": r'(?i)(?:subprocess\.call|os\.system|shell\s*=\s*True)',
"severity": "MEDIUM",
"category": "Security",
"description": "Potential shell command execution - verify input is sanitized",
"recommendation": "Use subprocess with shell=False and pass arguments as a list",
},
]
for filename, content in changed_files.items():
# Only check added lines (starting with +)
added_lines = []
line_numbers = []
current_line = 0
for line in content.splitlines():
if line.startswith("@@"):
# Parse line number from @@ -x,y +a,b @@
match = re.search(r"\+(\d+)", line)
if match:
current_line = int(match.group(1)) - 1
elif line.startswith("+") and not line.startswith("+++"):
current_line += 1
added_lines.append((current_line, line[1:]))
elif not line.startswith("-"):
current_line += 1
# Check patterns on added lines
for line_num, line_content in added_lines:
for pattern_def in patterns:
if re.search(pattern_def["pattern"], line_content):
issues.append(
ReviewIssue(
file=filename,
line=line_num,
severity=pattern_def["severity"],
category=pattern_def["category"],
description=pattern_def["description"],
recommendation=pattern_def["recommendation"],
code_snippet=line_content.strip()[:100],
)
)
return issues
def _run_ai_review(
self,
diff: str,
context: AgentContext,
security_issues: list[ReviewIssue],
) -> PRReviewResult:
"""Run AI-based code review."""
prompt_template = self.load_prompt("base")
# Add security context if issues were found
security_context = ""
if security_issues:
security_context = "\n\nSECURITY SCAN RESULTS (already detected):\n"
for issue in security_issues[:5]: # Limit to first 5
security_context += f"- [{issue.severity}] {issue.file}:{issue.line} - {issue.description}\n"
prompt = f"{prompt_template}\n{security_context}\nDIFF:\n{diff}"
try:
result = self.call_llm_json(prompt)
issues = []
for issue_data in result.get("issues", []):
issues.append(
ReviewIssue(
file=issue_data.get("file", "unknown"),
line=issue_data.get("line"),
severity=issue_data.get("severity", "MEDIUM"),
category=issue_data.get("category", "General"),
description=issue_data.get("description", ""),
recommendation=issue_data.get("recommendation", ""),
code_snippet=issue_data.get("code_snippet"),
)
)
return PRReviewResult(
summary=result.get("summary", "Review completed"),
issues=issues,
overall_severity=result.get("overall_severity", "LOW"),
approval=result.get("approval", True),
security_issues=security_issues,
)
except Exception as e:
self.logger.error(f"AI review failed: {e}")
return PRReviewResult(
summary=f"AI review encountered an error: {e}",
issues=[],
overall_severity="UNKNOWN",
approval=False,
security_issues=security_issues,
)
def _post_inline_comments(
self,
owner: str,
repo: str,
pr_number: int,
review: PRReviewResult,
) -> int:
"""Post inline comments for issues with line numbers."""
comments = []
all_issues = review.issues + review.security_issues
for issue in all_issues:
if issue.line and issue.file:
comment_body = (
f"**[{issue.severity}] {issue.category}**\n\n"
f"{issue.description}\n\n"
f"**Recommendation:** {issue.recommendation}"
)
comments.append(
{
"path": issue.file,
"line": issue.line,
"body": comment_body,
}
)
if not comments:
return 0
try:
# Use Gitea's pull request review API for inline comments
self.gitea.create_pull_request_review(
owner=owner,
repo=repo,
index=pr_number,
body="AI Code Review - Inline Comments",
event="COMMENT",
comments=comments[:10], # Limit to 10 inline comments
)
return min(len(comments), 10)
except Exception as e:
self.logger.warning(f"Failed to post inline comments: {e}")
return 0
def _generate_summary_comment(self, review: PRReviewResult) -> str:
"""Generate the summary comment for the PR."""
lines = [
f"{self.AI_DISCLAIMER}",
"",
"## AI Code Review",
"",
review.summary,
"",
]
# Statistics
all_issues = review.issues + review.security_issues
high = sum(1 for i in all_issues if i.severity == "HIGH")
medium = sum(1 for i in all_issues if i.severity == "MEDIUM")
low = sum(1 for i in all_issues if i.severity == "LOW")
lines.append("### Summary")
lines.append("")
lines.append(f"| Severity | Count |")
lines.append(f"|----------|-------|")
lines.append(f"| HIGH | {high} |")
lines.append(f"| MEDIUM | {medium} |")
lines.append(f"| LOW | {low} |")
lines.append("")
# Security issues section
if review.security_issues:
lines.append("### Security Issues")
lines.append("")
for issue in review.security_issues[:5]:
lines.append(f"- **[{issue.severity}]** `{issue.file}:{issue.line}` - {issue.description}")
lines.append("")
# Other issues (limit display)
other_issues = [i for i in review.issues if i not in review.security_issues]
if other_issues:
lines.append("### Review Findings")
lines.append("")
for issue in other_issues[:10]:
loc = f"`{issue.file}:{issue.line}`" if issue.line else f"`{issue.file}`"
lines.append(f"- **[{issue.severity}]** {loc} - {issue.description}")
if len(other_issues) > 10:
lines.append(f"- ...and {len(other_issues) - 10} more issues")
lines.append("")
# Verdict
lines.append("---")
lines.append(f"**Overall Severity:** `{review.overall_severity}`")
if review.approval:
lines.append("**AI Recommendation:** Approve")
else:
lines.append("**AI Recommendation:** Changes Requested")
return "\n".join(lines)
def _apply_review_labels(
self,
owner: str,
repo: str,
pr_number: int,
review: PRReviewResult,
) -> list[str]:
"""Apply labels based on review result."""
labels_config = self.config.get("labels", {}).get("status", {})
try:
repo_labels = self.gitea.get_repo_labels(owner, repo)
label_map = {l["name"]: l["id"] for l in repo_labels}
except Exception as e:
self.logger.warning(f"Failed to get repo labels: {e}")
return []
labels_to_add = []
# Add approval/changes required label
if review.approval:
label_name = labels_config.get("ai_approved", "ai-approved")
else:
label_name = labels_config.get("ai_changes_required", "ai-changes-required")
if label_name in label_map:
labels_to_add.append(label_map[label_name])
if labels_to_add:
try:
self.gitea.add_issue_labels(owner, repo, pr_number, labels_to_add)
return [name for name, id in label_map.items() if id in labels_to_add]
except Exception as e:
self.logger.warning(f"Failed to add labels: {e}")
return []