Files
openrabbit/tools/ai-review/agents/dependency_agent.py
latte e8d28225e0
All checks were successful
AI Codebase Quality Review / ai-codebase-review (push) Successful in 39s
just why not
2026-01-07 21:19:46 +01:00

549 lines
20 KiB
Python

"""Dependency Security Agent
AI agent for scanning dependency files for known vulnerabilities
and outdated packages. Supports multiple package managers.
"""
import base64
import json
import logging
import os
import re
import subprocess
from dataclasses import dataclass, field
from typing import Any
from agents.base_agent import AgentContext, AgentResult, BaseAgent
@dataclass
class DependencyFinding:
"""A security finding in a dependency."""
package: str
version: str
severity: str # CRITICAL, HIGH, MEDIUM, LOW
vulnerability_id: str # CVE, GHSA, etc.
title: str
description: str
fixed_version: str | None = None
references: list[str] = field(default_factory=list)
@dataclass
class DependencyReport:
"""Report of dependency analysis."""
total_packages: int
vulnerable_packages: int
outdated_packages: int
findings: list[DependencyFinding]
recommendations: list[str]
files_scanned: list[str]
class DependencyAgent(BaseAgent):
"""Agent for scanning dependencies for security vulnerabilities."""
# Marker for dependency comments
DEP_AI_MARKER = "<!-- AI_DEPENDENCY_SCAN -->"
# Supported dependency files
DEPENDENCY_FILES = {
"python": ["requirements.txt", "Pipfile", "pyproject.toml", "setup.py"],
"javascript": ["package.json", "package-lock.json", "yarn.lock"],
"ruby": ["Gemfile", "Gemfile.lock"],
"go": ["go.mod", "go.sum"],
"rust": ["Cargo.toml", "Cargo.lock"],
"java": ["pom.xml", "build.gradle", "build.gradle.kts"],
"php": ["composer.json", "composer.lock"],
"dotnet": ["*.csproj", "packages.config", "*.fsproj"],
}
# Common vulnerable package patterns
KNOWN_VULNERABILITIES = {
"python": {
"requests": {
"< 2.31.0": "CVE-2023-32681 - Proxy-Authorization header leak"
},
"urllib3": {
"< 2.0.7": "CVE-2023-45803 - Request body not stripped on redirects"
},
"cryptography": {"< 41.0.0": "Multiple CVEs - Update recommended"},
"pillow": {"< 10.0.0": "CVE-2023-4863 - WebP vulnerability"},
"django": {"< 4.2.0": "Multiple security fixes"},
"flask": {"< 2.3.0": "Security improvements"},
"pyyaml": {"< 6.0": "CVE-2020-14343 - Arbitrary code execution"},
"jinja2": {"< 3.1.0": "Security fixes"},
},
"javascript": {
"lodash": {"< 4.17.21": "CVE-2021-23337 - Prototype pollution"},
"axios": {"< 1.6.0": "CVE-2023-45857 - CSRF vulnerability"},
"express": {"< 4.18.0": "Security updates"},
"jquery": {"< 3.5.0": "XSS vulnerabilities"},
"minimist": {"< 1.2.6": "Prototype pollution"},
"node-fetch": {"< 3.3.0": "Security fixes"},
},
}
def can_handle(self, event_type: str, event_data: dict) -> bool:
"""Check if this agent handles the given event."""
agent_config = self.config.get("agents", {}).get("dependency", {})
if not agent_config.get("enabled", True):
return False
# Handle PR events that modify dependency files
if event_type == "pull_request":
action = event_data.get("action", "")
if action in ["opened", "synchronize"]:
# Check if any dependency files are modified
files = event_data.get("files", [])
for f in files:
if self._is_dependency_file(f.get("filename", "")):
return True
# Handle @codebot check-deps command
if event_type == "issue_comment":
comment_body = event_data.get("comment", {}).get("body", "")
mention_prefix = self.config.get("interaction", {}).get(
"mention_prefix", "@codebot"
)
if f"{mention_prefix} check-deps" in comment_body.lower():
return True
return False
def _is_dependency_file(self, filename: str) -> bool:
"""Check if a file is a dependency file."""
basename = os.path.basename(filename)
for lang, files in self.DEPENDENCY_FILES.items():
for pattern in files:
if pattern.startswith("*"):
if basename.endswith(pattern[1:]):
return True
elif basename == pattern:
return True
return False
def execute(self, context: AgentContext) -> AgentResult:
"""Execute the dependency agent."""
self.logger.info(f"Scanning dependencies for {context.owner}/{context.repo}")
actions_taken = []
# Determine if this is a command or PR event
if context.event_type == "issue_comment":
issue = context.event_data.get("issue", {})
issue_number = issue.get("number")
comment_author = (
context.event_data.get("comment", {})
.get("user", {})
.get("login", "user")
)
else:
pr = context.event_data.get("pull_request", {})
issue_number = pr.get("number")
comment_author = None
# Collect dependency files
dep_files = self._collect_dependency_files(context.owner, context.repo)
if not dep_files:
message = "No dependency files found in repository."
if issue_number:
self.gitea.create_issue_comment(
context.owner,
context.repo,
issue_number,
f"{self.AI_DISCLAIMER}\n\n{message}",
)
return AgentResult(
success=True,
message=message,
)
actions_taken.append(f"Found {len(dep_files)} dependency files")
# Analyze dependencies
report = self._analyze_dependencies(context.owner, context.repo, dep_files)
actions_taken.append(f"Analyzed {report.total_packages} packages")
# Run external scanners if available
external_findings = self._run_external_scanners(context.owner, context.repo)
if external_findings:
report.findings.extend(external_findings)
actions_taken.append(
f"External scanner found {len(external_findings)} issues"
)
# Generate and post report
if issue_number:
comment = self._format_dependency_report(report, comment_author)
self.upsert_comment(
context.owner,
context.repo,
issue_number,
comment,
marker=self.DEP_AI_MARKER,
)
actions_taken.append("Posted dependency report")
return AgentResult(
success=True,
message=f"Dependency scan complete: {report.vulnerable_packages} vulnerable, {report.outdated_packages} outdated",
data={
"total_packages": report.total_packages,
"vulnerable_packages": report.vulnerable_packages,
"outdated_packages": report.outdated_packages,
"findings_count": len(report.findings),
},
actions_taken=actions_taken,
)
def _collect_dependency_files(
self, owner: str, repo: str
) -> dict[str, dict[str, Any]]:
"""Collect all dependency files from the repository."""
dep_files = {}
# Common paths to check
paths_to_check = [
"", # Root
"backend/",
"frontend/",
"api/",
"services/",
]
for base_path in paths_to_check:
for lang, filenames in self.DEPENDENCY_FILES.items():
for filename in filenames:
if filename.startswith("*"):
continue # Skip glob patterns for now
filepath = f"{base_path}{filename}".lstrip("/")
try:
content_data = self.gitea.get_file_contents(
owner, repo, filepath
)
if content_data.get("content"):
content = base64.b64decode(content_data["content"]).decode(
"utf-8", errors="ignore"
)
dep_files[filepath] = {
"language": lang,
"content": content,
}
except Exception:
pass # File doesn't exist
return dep_files
def _analyze_dependencies(
self, owner: str, repo: str, dep_files: dict
) -> DependencyReport:
"""Analyze dependency files for vulnerabilities."""
findings = []
total_packages = 0
vulnerable_count = 0
outdated_count = 0
recommendations = []
files_scanned = list(dep_files.keys())
for filepath, file_info in dep_files.items():
lang = file_info["language"]
content = file_info["content"]
if lang == "python":
packages = self._parse_python_deps(content, filepath)
elif lang == "javascript":
packages = self._parse_javascript_deps(content, filepath)
else:
packages = []
total_packages += len(packages)
# Check for known vulnerabilities
known_vulns = self.KNOWN_VULNERABILITIES.get(lang, {})
for pkg_name, version in packages:
if pkg_name.lower() in known_vulns:
vuln_info = known_vulns[pkg_name.lower()]
for version_constraint, vuln_desc in vuln_info.items():
if self._version_matches_constraint(
version, version_constraint
):
findings.append(
DependencyFinding(
package=pkg_name,
version=version or "unknown",
severity="HIGH",
vulnerability_id=vuln_desc.split(" - ")[0]
if " - " in vuln_desc
else "VULN",
title=vuln_desc,
description=f"Package {pkg_name} version {version} has known vulnerabilities",
fixed_version=version_constraint.replace("< ", ""),
)
)
vulnerable_count += 1
# Add recommendations
if vulnerable_count > 0:
recommendations.append(
f"Update {vulnerable_count} packages with known vulnerabilities"
)
if total_packages > 50:
recommendations.append(
"Consider auditing dependencies to reduce attack surface"
)
return DependencyReport(
total_packages=total_packages,
vulnerable_packages=vulnerable_count,
outdated_packages=outdated_count,
findings=findings,
recommendations=recommendations,
files_scanned=files_scanned,
)
def _parse_python_deps(
self, content: str, filepath: str
) -> list[tuple[str, str | None]]:
"""Parse Python dependency file."""
packages = []
if "requirements" in filepath.lower():
# requirements.txt format
for line in content.splitlines():
line = line.strip()
if not line or line.startswith("#") or line.startswith("-"):
continue
# Parse package==version, package>=version, package
match = re.match(r"([a-zA-Z0-9_-]+)([<>=!]+)?(.+)?", line)
if match:
pkg_name = match.group(1)
version = match.group(3) if match.group(3) else None
packages.append((pkg_name, version))
elif filepath.endswith("pyproject.toml"):
# pyproject.toml format
in_deps = False
for line in content.splitlines():
if (
"[project.dependencies]" in line
or "[tool.poetry.dependencies]" in line
):
in_deps = True
continue
if in_deps:
if line.startswith("["):
in_deps = False
continue
match = re.match(r'"?([a-zA-Z0-9_-]+)"?\s*[=<>]', line)
if match:
packages.append((match.group(1), None))
return packages
def _parse_javascript_deps(
self, content: str, filepath: str
) -> list[tuple[str, str | None]]:
"""Parse JavaScript dependency file."""
packages = []
if filepath.endswith("package.json"):
try:
data = json.loads(content)
for dep_type in ["dependencies", "devDependencies"]:
deps = data.get(dep_type, {})
for name, version in deps.items():
# Strip version prefixes like ^, ~, >=
clean_version = re.sub(r"^[\^~>=<]+", "", version)
packages.append((name, clean_version))
except json.JSONDecodeError:
pass
return packages
def _version_matches_constraint(self, version: str | None, constraint: str) -> bool:
"""Check if version matches a vulnerability constraint."""
if not version:
return True # Assume vulnerable if version unknown
# Simple version comparison
if constraint.startswith("< "):
target = constraint[2:]
try:
return self._compare_versions(version, target) < 0
except Exception:
return False
return False
def _compare_versions(self, v1: str, v2: str) -> int:
"""Compare two version strings. Returns -1, 0, or 1."""
def normalize(v):
return [int(x) for x in re.sub(r"[^0-9.]", "", v).split(".") if x]
try:
parts1 = normalize(v1)
parts2 = normalize(v2)
for i in range(max(len(parts1), len(parts2))):
p1 = parts1[i] if i < len(parts1) else 0
p2 = parts2[i] if i < len(parts2) else 0
if p1 < p2:
return -1
if p1 > p2:
return 1
return 0
except Exception:
return 0
def _run_external_scanners(self, owner: str, repo: str) -> list[DependencyFinding]:
"""Run external vulnerability scanners if available."""
findings = []
agent_config = self.config.get("agents", {}).get("dependency", {})
# Try pip-audit for Python
if agent_config.get("pip_audit", False):
try:
result = subprocess.run(
["pip-audit", "--format", "json"],
capture_output=True,
text=True,
timeout=60,
)
if result.returncode == 0:
data = json.loads(result.stdout)
for vuln in data.get("vulnerabilities", []):
findings.append(
DependencyFinding(
package=vuln.get("name", ""),
version=vuln.get("version", ""),
severity=vuln.get("severity", "MEDIUM"),
vulnerability_id=vuln.get("id", ""),
title=vuln.get("description", "")[:100],
description=vuln.get("description", ""),
fixed_version=vuln.get("fix_versions", [None])[0],
)
)
except Exception as e:
self.logger.debug(f"pip-audit not available: {e}")
# Try npm audit for JavaScript
if agent_config.get("npm_audit", False):
try:
result = subprocess.run(
["npm", "audit", "--json"],
capture_output=True,
text=True,
timeout=60,
)
data = json.loads(result.stdout)
for vuln_id, vuln in data.get("vulnerabilities", {}).items():
findings.append(
DependencyFinding(
package=vuln.get("name", vuln_id),
version=vuln.get("range", ""),
severity=vuln.get("severity", "moderate").upper(),
vulnerability_id=vuln_id,
title=vuln.get("title", ""),
description=vuln.get("overview", ""),
fixed_version=vuln.get("fixAvailable", {}).get("version"),
)
)
except Exception as e:
self.logger.debug(f"npm audit not available: {e}")
return findings
def _format_dependency_report(
self, report: DependencyReport, user: str | None = None
) -> str:
"""Format the dependency report as a comment."""
lines = []
if user:
lines.append(f"@{user}")
lines.append("")
lines.extend(
[
f"{self.AI_DISCLAIMER}",
"",
"## 🔍 Dependency Security Scan",
"",
"### Summary",
"",
f"| Metric | Value |",
f"|--------|-------|",
f"| Total Packages | {report.total_packages} |",
f"| Vulnerable | {report.vulnerable_packages} |",
f"| Outdated | {report.outdated_packages} |",
f"| Files Scanned | {len(report.files_scanned)} |",
"",
]
)
# Findings by severity
if report.findings:
lines.append("### 🚨 Security Findings")
lines.append("")
# Group by severity
by_severity = {"CRITICAL": [], "HIGH": [], "MEDIUM": [], "LOW": []}
for finding in report.findings:
sev = finding.severity.upper()
if sev in by_severity:
by_severity[sev].append(finding)
severity_emoji = {
"CRITICAL": "🔴",
"HIGH": "🟠",
"MEDIUM": "🟡",
"LOW": "🔵",
}
for severity in ["CRITICAL", "HIGH", "MEDIUM", "LOW"]:
findings = by_severity[severity]
if findings:
lines.append(f"#### {severity_emoji[severity]} {severity}")
lines.append("")
for f in findings[:10]: # Limit display
lines.append(f"- **{f.package}** `{f.version}`")
lines.append(f" - {f.vulnerability_id}: {f.title}")
if f.fixed_version:
lines.append(f" - ✅ Fix: Upgrade to `{f.fixed_version}`")
if len(findings) > 10:
lines.append(f" - ... and {len(findings) - 10} more")
lines.append("")
# Files scanned
lines.append("### 📁 Files Scanned")
lines.append("")
for f in report.files_scanned:
lines.append(f"- `{f}`")
lines.append("")
# Recommendations
if report.recommendations:
lines.append("### 💡 Recommendations")
lines.append("")
for rec in report.recommendations:
lines.append(f"- {rec}")
lines.append("")
# Overall status
if report.vulnerable_packages == 0:
lines.append("---")
lines.append("✅ **No known vulnerabilities detected**")
else:
lines.append("---")
lines.append(
f"⚠️ **{report.vulnerable_packages} vulnerable packages require attention**"
)
return "\n".join(lines)