Files
openrabbit/tools/ai-review/agents/codebase_agent.py
latte 69d9963597
All checks were successful
Enterprise AI Code Review / ai-review (pull_request) Successful in 32s
update
2025-12-28 14:10:04 +00:00

490 lines
16 KiB
Python

"""Codebase Quality Agent
AI agent for analyzing overall codebase health, architecture,
technical debt, and documentation coverage.
"""
import base64
import os
from dataclasses import dataclass, field
from agents.base_agent import AgentContext, AgentResult, BaseAgent
@dataclass
class CodebaseMetrics:
"""Metrics collected from codebase analysis."""
total_files: int = 0
total_lines: int = 0
languages: dict = field(default_factory=dict)
todo_count: int = 0
fixme_count: int = 0
deprecated_count: int = 0
missing_docstrings: int = 0
@dataclass
class CodebaseReport:
"""Complete codebase analysis report."""
summary: str
health_score: float # 0-100
metrics: CodebaseMetrics
issues: list[dict]
recommendations: list[str]
architecture_notes: list[str]
class CodebaseAgent(BaseAgent):
"""Agent for codebase quality analysis."""
# Marker for codebase reports
CODEBASE_AI_MARKER = "<!-- AI_CODEBASE_REVIEW -->"
# File extensions to analyze
CODE_EXTENSIONS = {
".py": "Python",
".js": "JavaScript",
".ts": "TypeScript",
".go": "Go",
".rs": "Rust",
".java": "Java",
".rb": "Ruby",
".php": "PHP",
".c": "C",
".cpp": "C++",
".h": "C/C++ Header",
".cs": "C#",
".swift": "Swift",
".kt": "Kotlin",
}
# Files to ignore
IGNORE_PATTERNS = [
"node_modules/",
"vendor/",
".git/",
"__pycache__/",
".venv/",
"dist/",
"build/",
".min.js",
".min.css",
]
def can_handle(self, event_type: str, event_data: dict) -> bool:
"""Check if this agent handles the given event."""
agent_config = self.config.get("agents", {}).get("codebase", {})
if not agent_config.get("enabled", True):
return False
# Handle manual trigger via workflow_dispatch or schedule
if event_type in ("workflow_dispatch", "schedule"):
return True
# Handle special issue command
if event_type == "issue_comment":
comment_body = event_data.get("comment", {}).get("body", "")
mention_prefix = self.config.get("interaction", {}).get(
"mention_prefix", "@ai-bot"
)
if f"{mention_prefix} codebase" in comment_body.lower():
return True
return False
def execute(self, context: AgentContext) -> AgentResult:
"""Execute codebase analysis."""
self.logger.info(
f"Starting codebase analysis for {context.owner}/{context.repo}"
)
actions_taken = []
# Step 1: Collect file list from repository
files = self._collect_files(context.owner, context.repo)
self.logger.info(f"Found {len(files)} files to analyze")
# Step 2: Analyze metrics
metrics = self._analyze_metrics(context.owner, context.repo, files)
actions_taken.append(f"Analyzed {metrics.total_files} files")
# Step 3: Run AI analysis on key files
report = self._run_ai_analysis(context, files, metrics)
actions_taken.append("Generated AI analysis report")
# Step 4: Create or update report issue
issue_number = self._create_report_issue(context, report)
actions_taken.append(f"Created/updated report issue #{issue_number}")
return AgentResult(
success=True,
message=f"Codebase analysis complete - Health Score: {report.health_score:.0f}/100",
data={
"health_score": report.health_score,
"total_files": metrics.total_files,
"issues_found": len(report.issues),
},
actions_taken=actions_taken,
)
def _collect_files(self, owner: str, repo: str) -> list[dict]:
"""Collect list of files from the repository."""
files = []
def traverse(path: str = ""):
try:
contents = self.gitea.get_file_contents(owner, repo, path or ".")
if isinstance(contents, list):
for item in contents:
item_path = item.get("path", "")
# Skip ignored patterns
if any(p in item_path for p in self.IGNORE_PATTERNS):
continue
if item.get("type") == "file":
ext = os.path.splitext(item_path)[1]
if ext in self.CODE_EXTENSIONS:
files.append(item)
elif item.get("type") == "dir":
traverse(item_path)
except Exception as e:
self.logger.warning(f"Failed to list {path}: {e}")
traverse()
return files[:100] # Limit to prevent API exhaustion
def _analyze_metrics(
self,
owner: str,
repo: str,
files: list[dict],
) -> CodebaseMetrics:
"""Analyze metrics from files."""
metrics = CodebaseMetrics()
metrics.total_files = len(files)
for file_info in files[:50]: # Analyze top 50 files
filepath = file_info.get("path", "")
ext = os.path.splitext(filepath)[1]
lang = self.CODE_EXTENSIONS.get(ext, "Unknown")
metrics.languages[lang] = metrics.languages.get(lang, 0) + 1
try:
content_data = self.gitea.get_file_contents(owner, repo, filepath)
if content_data.get("content"):
content = base64.b64decode(content_data["content"]).decode(
"utf-8", errors="ignore"
)
lines = content.splitlines()
metrics.total_lines += len(lines)
# Count markers
for line in lines:
line_upper = line.upper()
if "TODO" in line_upper:
metrics.todo_count += 1
if "FIXME" in line_upper:
metrics.fixme_count += 1
if "DEPRECATED" in line_upper:
metrics.deprecated_count += 1
# Check for docstrings (Python)
if ext == ".py":
if "def " in content and '"""' not in content:
metrics.missing_docstrings += 1
except Exception as e:
self.logger.debug(f"Could not analyze {filepath}: {e}")
return metrics
def _run_ai_analysis(
self,
context: AgentContext,
files: list[dict],
metrics: CodebaseMetrics,
) -> CodebaseReport:
"""Run AI analysis on the codebase."""
# Prepare context for AI
file_list = "\n".join([f"- {f.get('path', '')}" for f in files[:30]])
language_breakdown = "\n".join(
[f"- {lang}: {count} files" for lang, count in metrics.languages.items()]
)
# Sample some key files for deeper analysis
key_files_content = self._get_key_files_content(
context.owner, context.repo, files
)
prompt = f"""Analyze this codebase and provide a comprehensive quality assessment.
## Repository: {context.owner}/{context.repo}
## Metrics
- Total Files: {metrics.total_files}
- Total Lines: {metrics.total_lines}
- TODO Comments: {metrics.todo_count}
- FIXME Comments: {metrics.fixme_count}
- Deprecated Markers: {metrics.deprecated_count}
## Language Breakdown
{language_breakdown}
## File Structure (sample)
{file_list}
## Key Files Content
{key_files_content}
## Analysis Required
Provide your analysis as JSON with this structure:
```json
{{
"summary": "Overall assessment in 2-3 sentences",
"health_score": 0-100,
"issues": [
{{
"severity": "HIGH|MEDIUM|LOW",
"category": "Architecture|Code Quality|Security|Testing|Documentation",
"description": "Issue description",
"recommendation": "How to fix"
}}
],
"recommendations": ["Top 3-5 actionable recommendations"],
"architecture_notes": ["Observations about code structure and patterns"]
}}
```
Be constructive and actionable. Focus on the most impactful improvements.
"""
try:
result = self.call_llm_json(prompt)
return CodebaseReport(
summary=result.get("summary", "Analysis complete"),
health_score=float(result.get("health_score", 50)),
metrics=metrics,
issues=result.get("issues", []),
recommendations=result.get("recommendations", []),
architecture_notes=result.get("architecture_notes", []),
)
except Exception as e:
self.logger.error(f"AI analysis failed: {e}")
self.logger.debug(f"Full error details: {type(e).__name__}: {str(e)}")
# Calculate basic health score from metrics
health_score = 70
if metrics.todo_count > 10:
health_score -= 10
if metrics.fixme_count > 5:
health_score -= 10
if metrics.deprecated_count > 3:
health_score -= 5
# Build recommendations based on metrics
recommendations = []
if metrics.todo_count > 5:
recommendations.append(
f"Review and address {metrics.todo_count} TODO comments"
)
if metrics.fixme_count > 0:
recommendations.append(
f"Review and fix {metrics.fixme_count} FIXME markers"
)
if metrics.deprecated_count > 0:
recommendations.append(
f"Update {metrics.deprecated_count} deprecated code sections"
)
if metrics.missing_docstrings > 5:
recommendations.append("Consider adding more documentation")
if not recommendations:
recommendations.append("Codebase appears well-maintained")
return CodebaseReport(
summary=f"Basic metrics analysis complete. {metrics.total_files} files analyzed across {len(metrics.languages)} languages.",
health_score=health_score,
metrics=metrics,
issues=[],
recommendations=recommendations,
architecture_notes=[
f"Primary languages: {', '.join(list(metrics.languages.keys())[:3])}"
],
)
def _get_key_files_content(
self,
owner: str,
repo: str,
files: list[dict],
) -> str:
"""Get content of key files for AI analysis."""
key_file_names = [
"README.md",
"setup.py",
"pyproject.toml",
"package.json",
"Cargo.toml",
"go.mod",
"Makefile",
"Dockerfile",
]
content_parts = []
for file_info in files:
filepath = file_info.get("path", "")
filename = os.path.basename(filepath)
if filename in key_file_names:
try:
content_data = self.gitea.get_file_contents(owner, repo, filepath)
if content_data.get("content"):
content = base64.b64decode(content_data["content"]).decode(
"utf-8", errors="ignore"
)
# Truncate long files
if len(content) > 2000:
content = content[:2000] + "\n... (truncated)"
content_parts.append(f"### {filepath}\n```\n{content}\n```")
except Exception:
pass
return "\n\n".join(content_parts[:5]) or "No key configuration files found."
def _create_report_issue(
self,
context: AgentContext,
report: CodebaseReport,
) -> int:
"""Create or update a report issue."""
# Generate issue body
body = self._generate_report_body(report)
# Look for existing report issue
try:
issues = self.gitea.list_issues(
context.owner, context.repo, state="open", labels=["ai-codebase-report"]
)
for issue in issues:
if self.CODEBASE_AI_MARKER in issue.get("body", ""):
# Update existing issue body
self.gitea.update_issue(
context.owner,
context.repo,
issue["number"],
body=body,
)
return issue["number"]
except Exception as e:
self.logger.warning(f"Failed to check for existing report: {e}")
# Create new issue
try:
# Check for label ID
labels = []
try:
repo_labels = self.gitea.get_repo_labels(context.owner, context.repo)
for label in repo_labels:
if label["name"] == "ai-codebase-report":
labels.append(label["id"])
break
except Exception:
pass
issue = self.gitea.create_issue(
context.owner,
context.repo,
title=f"AI Codebase Report - {context.repo}",
body=body,
labels=labels,
)
return issue["number"]
except Exception as e:
self.logger.error(f"Failed to create report issue: {e}")
return 0
def _generate_report_body(self, report: CodebaseReport) -> str:
"""Generate the report issue body."""
health_emoji = (
"🟢"
if report.health_score >= 80
else ("🟡" if report.health_score >= 60 else "🔴")
)
lines = [
f"{self.AI_DISCLAIMER}",
"",
"# AI Codebase Quality Report",
"",
f"## Health Score: {report.health_score:.0f}/100",
"",
report.summary,
"",
"---",
"",
"## Metrics",
"",
"| Metric | Value |",
"|--------|-------|",
f"| Total Files | {report.metrics.total_files} |",
f"| Total Lines | {report.metrics.total_lines:,} |",
f"| TODO Comments | {report.metrics.todo_count} |",
f"| FIXME Comments | {report.metrics.fixme_count} |",
f"| Deprecated | {report.metrics.deprecated_count} |",
"",
]
# Languages
if report.metrics.languages:
lines.append("### Languages")
lines.append("")
for lang, count in sorted(
report.metrics.languages.items(), key=lambda x: -x[1]
):
lines.append(f"- **{lang}**: {count} files")
lines.append("")
# Issues
if report.issues:
lines.append("## Issues Found")
lines.append("")
for issue in report.issues[:10]:
severity = issue.get("severity", "MEDIUM")
emoji = (
"🔴"
if severity == "HIGH"
else ("🟡" if severity == "MEDIUM" else "🟢")
)
lines.append(f"### [{severity}] {issue.get('category', 'General')}")
lines.append("")
lines.append(issue.get("description", ""))
lines.append("")
lines.append(f"**Recommendation:** {issue.get('recommendation', '')}")
lines.append("")
# Recommendations
if report.recommendations:
lines.append("## Recommendations")
lines.append("")
for i, rec in enumerate(report.recommendations[:5], 1):
lines.append(f"{i}. {rec}")
lines.append("")
# Architecture notes
if report.architecture_notes:
lines.append("## Architecture Notes")
lines.append("")
for note in report.architecture_notes[:5]:
lines.append(f"- {note}")
lines.append("")
lines.append("---")
lines.append(f"*Generated by AI Codebase Agent*")
return "\n".join(lines)