All checks were successful
AI Codebase Quality Review / ai-codebase-review (push) Successful in 39s
549 lines
20 KiB
Python
549 lines
20 KiB
Python
"""Dependency Security Agent
|
|
|
|
AI agent for scanning dependency files for known vulnerabilities
|
|
and outdated packages. Supports multiple package managers.
|
|
"""
|
|
|
|
import base64
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import subprocess
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
from agents.base_agent import AgentContext, AgentResult, BaseAgent
|
|
|
|
|
|
@dataclass
|
|
class DependencyFinding:
|
|
"""A security finding in a dependency."""
|
|
|
|
package: str
|
|
version: str
|
|
severity: str # CRITICAL, HIGH, MEDIUM, LOW
|
|
vulnerability_id: str # CVE, GHSA, etc.
|
|
title: str
|
|
description: str
|
|
fixed_version: str | None = None
|
|
references: list[str] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class DependencyReport:
|
|
"""Report of dependency analysis."""
|
|
|
|
total_packages: int
|
|
vulnerable_packages: int
|
|
outdated_packages: int
|
|
findings: list[DependencyFinding]
|
|
recommendations: list[str]
|
|
files_scanned: list[str]
|
|
|
|
|
|
class DependencyAgent(BaseAgent):
|
|
"""Agent for scanning dependencies for security vulnerabilities."""
|
|
|
|
# Marker for dependency comments
|
|
DEP_AI_MARKER = "<!-- AI_DEPENDENCY_SCAN -->"
|
|
|
|
# Supported dependency files
|
|
DEPENDENCY_FILES = {
|
|
"python": ["requirements.txt", "Pipfile", "pyproject.toml", "setup.py"],
|
|
"javascript": ["package.json", "package-lock.json", "yarn.lock"],
|
|
"ruby": ["Gemfile", "Gemfile.lock"],
|
|
"go": ["go.mod", "go.sum"],
|
|
"rust": ["Cargo.toml", "Cargo.lock"],
|
|
"java": ["pom.xml", "build.gradle", "build.gradle.kts"],
|
|
"php": ["composer.json", "composer.lock"],
|
|
"dotnet": ["*.csproj", "packages.config", "*.fsproj"],
|
|
}
|
|
|
|
# Common vulnerable package patterns
|
|
KNOWN_VULNERABILITIES = {
|
|
"python": {
|
|
"requests": {
|
|
"< 2.31.0": "CVE-2023-32681 - Proxy-Authorization header leak"
|
|
},
|
|
"urllib3": {
|
|
"< 2.0.7": "CVE-2023-45803 - Request body not stripped on redirects"
|
|
},
|
|
"cryptography": {"< 41.0.0": "Multiple CVEs - Update recommended"},
|
|
"pillow": {"< 10.0.0": "CVE-2023-4863 - WebP vulnerability"},
|
|
"django": {"< 4.2.0": "Multiple security fixes"},
|
|
"flask": {"< 2.3.0": "Security improvements"},
|
|
"pyyaml": {"< 6.0": "CVE-2020-14343 - Arbitrary code execution"},
|
|
"jinja2": {"< 3.1.0": "Security fixes"},
|
|
},
|
|
"javascript": {
|
|
"lodash": {"< 4.17.21": "CVE-2021-23337 - Prototype pollution"},
|
|
"axios": {"< 1.6.0": "CVE-2023-45857 - CSRF vulnerability"},
|
|
"express": {"< 4.18.0": "Security updates"},
|
|
"jquery": {"< 3.5.0": "XSS vulnerabilities"},
|
|
"minimist": {"< 1.2.6": "Prototype pollution"},
|
|
"node-fetch": {"< 3.3.0": "Security fixes"},
|
|
},
|
|
}
|
|
|
|
def can_handle(self, event_type: str, event_data: dict) -> bool:
|
|
"""Check if this agent handles the given event."""
|
|
agent_config = self.config.get("agents", {}).get("dependency", {})
|
|
if not agent_config.get("enabled", True):
|
|
return False
|
|
|
|
# Handle PR events that modify dependency files
|
|
if event_type == "pull_request":
|
|
action = event_data.get("action", "")
|
|
if action in ["opened", "synchronize"]:
|
|
# Check if any dependency files are modified
|
|
files = event_data.get("files", [])
|
|
for f in files:
|
|
if self._is_dependency_file(f.get("filename", "")):
|
|
return True
|
|
|
|
# Handle @codebot check-deps command
|
|
if event_type == "issue_comment":
|
|
comment_body = event_data.get("comment", {}).get("body", "")
|
|
mention_prefix = self.config.get("interaction", {}).get(
|
|
"mention_prefix", "@codebot"
|
|
)
|
|
if f"{mention_prefix} check-deps" in comment_body.lower():
|
|
return True
|
|
|
|
return False
|
|
|
|
def _is_dependency_file(self, filename: str) -> bool:
|
|
"""Check if a file is a dependency file."""
|
|
basename = os.path.basename(filename)
|
|
for lang, files in self.DEPENDENCY_FILES.items():
|
|
for pattern in files:
|
|
if pattern.startswith("*"):
|
|
if basename.endswith(pattern[1:]):
|
|
return True
|
|
elif basename == pattern:
|
|
return True
|
|
return False
|
|
|
|
def execute(self, context: AgentContext) -> AgentResult:
|
|
"""Execute the dependency agent."""
|
|
self.logger.info(f"Scanning dependencies for {context.owner}/{context.repo}")
|
|
|
|
actions_taken = []
|
|
|
|
# Determine if this is a command or PR event
|
|
if context.event_type == "issue_comment":
|
|
issue = context.event_data.get("issue", {})
|
|
issue_number = issue.get("number")
|
|
comment_author = (
|
|
context.event_data.get("comment", {})
|
|
.get("user", {})
|
|
.get("login", "user")
|
|
)
|
|
else:
|
|
pr = context.event_data.get("pull_request", {})
|
|
issue_number = pr.get("number")
|
|
comment_author = None
|
|
|
|
# Collect dependency files
|
|
dep_files = self._collect_dependency_files(context.owner, context.repo)
|
|
if not dep_files:
|
|
message = "No dependency files found in repository."
|
|
if issue_number:
|
|
self.gitea.create_issue_comment(
|
|
context.owner,
|
|
context.repo,
|
|
issue_number,
|
|
f"{self.AI_DISCLAIMER}\n\n{message}",
|
|
)
|
|
return AgentResult(
|
|
success=True,
|
|
message=message,
|
|
)
|
|
|
|
actions_taken.append(f"Found {len(dep_files)} dependency files")
|
|
|
|
# Analyze dependencies
|
|
report = self._analyze_dependencies(context.owner, context.repo, dep_files)
|
|
actions_taken.append(f"Analyzed {report.total_packages} packages")
|
|
|
|
# Run external scanners if available
|
|
external_findings = self._run_external_scanners(context.owner, context.repo)
|
|
if external_findings:
|
|
report.findings.extend(external_findings)
|
|
actions_taken.append(
|
|
f"External scanner found {len(external_findings)} issues"
|
|
)
|
|
|
|
# Generate and post report
|
|
if issue_number:
|
|
comment = self._format_dependency_report(report, comment_author)
|
|
self.upsert_comment(
|
|
context.owner,
|
|
context.repo,
|
|
issue_number,
|
|
comment,
|
|
marker=self.DEP_AI_MARKER,
|
|
)
|
|
actions_taken.append("Posted dependency report")
|
|
|
|
return AgentResult(
|
|
success=True,
|
|
message=f"Dependency scan complete: {report.vulnerable_packages} vulnerable, {report.outdated_packages} outdated",
|
|
data={
|
|
"total_packages": report.total_packages,
|
|
"vulnerable_packages": report.vulnerable_packages,
|
|
"outdated_packages": report.outdated_packages,
|
|
"findings_count": len(report.findings),
|
|
},
|
|
actions_taken=actions_taken,
|
|
)
|
|
|
|
def _collect_dependency_files(
|
|
self, owner: str, repo: str
|
|
) -> dict[str, dict[str, Any]]:
|
|
"""Collect all dependency files from the repository."""
|
|
dep_files = {}
|
|
|
|
# Common paths to check
|
|
paths_to_check = [
|
|
"", # Root
|
|
"backend/",
|
|
"frontend/",
|
|
"api/",
|
|
"services/",
|
|
]
|
|
|
|
for base_path in paths_to_check:
|
|
for lang, filenames in self.DEPENDENCY_FILES.items():
|
|
for filename in filenames:
|
|
if filename.startswith("*"):
|
|
continue # Skip glob patterns for now
|
|
|
|
filepath = f"{base_path}{filename}".lstrip("/")
|
|
try:
|
|
content_data = self.gitea.get_file_contents(
|
|
owner, repo, filepath
|
|
)
|
|
if content_data.get("content"):
|
|
content = base64.b64decode(content_data["content"]).decode(
|
|
"utf-8", errors="ignore"
|
|
)
|
|
dep_files[filepath] = {
|
|
"language": lang,
|
|
"content": content,
|
|
}
|
|
except Exception:
|
|
pass # File doesn't exist
|
|
|
|
return dep_files
|
|
|
|
def _analyze_dependencies(
|
|
self, owner: str, repo: str, dep_files: dict
|
|
) -> DependencyReport:
|
|
"""Analyze dependency files for vulnerabilities."""
|
|
findings = []
|
|
total_packages = 0
|
|
vulnerable_count = 0
|
|
outdated_count = 0
|
|
recommendations = []
|
|
files_scanned = list(dep_files.keys())
|
|
|
|
for filepath, file_info in dep_files.items():
|
|
lang = file_info["language"]
|
|
content = file_info["content"]
|
|
|
|
if lang == "python":
|
|
packages = self._parse_python_deps(content, filepath)
|
|
elif lang == "javascript":
|
|
packages = self._parse_javascript_deps(content, filepath)
|
|
else:
|
|
packages = []
|
|
|
|
total_packages += len(packages)
|
|
|
|
# Check for known vulnerabilities
|
|
known_vulns = self.KNOWN_VULNERABILITIES.get(lang, {})
|
|
for pkg_name, version in packages:
|
|
if pkg_name.lower() in known_vulns:
|
|
vuln_info = known_vulns[pkg_name.lower()]
|
|
for version_constraint, vuln_desc in vuln_info.items():
|
|
if self._version_matches_constraint(
|
|
version, version_constraint
|
|
):
|
|
findings.append(
|
|
DependencyFinding(
|
|
package=pkg_name,
|
|
version=version or "unknown",
|
|
severity="HIGH",
|
|
vulnerability_id=vuln_desc.split(" - ")[0]
|
|
if " - " in vuln_desc
|
|
else "VULN",
|
|
title=vuln_desc,
|
|
description=f"Package {pkg_name} version {version} has known vulnerabilities",
|
|
fixed_version=version_constraint.replace("< ", ""),
|
|
)
|
|
)
|
|
vulnerable_count += 1
|
|
|
|
# Add recommendations
|
|
if vulnerable_count > 0:
|
|
recommendations.append(
|
|
f"Update {vulnerable_count} packages with known vulnerabilities"
|
|
)
|
|
if total_packages > 50:
|
|
recommendations.append(
|
|
"Consider auditing dependencies to reduce attack surface"
|
|
)
|
|
|
|
return DependencyReport(
|
|
total_packages=total_packages,
|
|
vulnerable_packages=vulnerable_count,
|
|
outdated_packages=outdated_count,
|
|
findings=findings,
|
|
recommendations=recommendations,
|
|
files_scanned=files_scanned,
|
|
)
|
|
|
|
def _parse_python_deps(
|
|
self, content: str, filepath: str
|
|
) -> list[tuple[str, str | None]]:
|
|
"""Parse Python dependency file."""
|
|
packages = []
|
|
|
|
if "requirements" in filepath.lower():
|
|
# requirements.txt format
|
|
for line in content.splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or line.startswith("-"):
|
|
continue
|
|
|
|
# Parse package==version, package>=version, package
|
|
match = re.match(r"([a-zA-Z0-9_-]+)([<>=!]+)?(.+)?", line)
|
|
if match:
|
|
pkg_name = match.group(1)
|
|
version = match.group(3) if match.group(3) else None
|
|
packages.append((pkg_name, version))
|
|
|
|
elif filepath.endswith("pyproject.toml"):
|
|
# pyproject.toml format
|
|
in_deps = False
|
|
for line in content.splitlines():
|
|
if (
|
|
"[project.dependencies]" in line
|
|
or "[tool.poetry.dependencies]" in line
|
|
):
|
|
in_deps = True
|
|
continue
|
|
if in_deps:
|
|
if line.startswith("["):
|
|
in_deps = False
|
|
continue
|
|
match = re.match(r'"?([a-zA-Z0-9_-]+)"?\s*[=<>]', line)
|
|
if match:
|
|
packages.append((match.group(1), None))
|
|
|
|
return packages
|
|
|
|
def _parse_javascript_deps(
|
|
self, content: str, filepath: str
|
|
) -> list[tuple[str, str | None]]:
|
|
"""Parse JavaScript dependency file."""
|
|
packages = []
|
|
|
|
if filepath.endswith("package.json"):
|
|
try:
|
|
data = json.loads(content)
|
|
for dep_type in ["dependencies", "devDependencies"]:
|
|
deps = data.get(dep_type, {})
|
|
for name, version in deps.items():
|
|
# Strip version prefixes like ^, ~, >=
|
|
clean_version = re.sub(r"^[\^~>=<]+", "", version)
|
|
packages.append((name, clean_version))
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
return packages
|
|
|
|
def _version_matches_constraint(self, version: str | None, constraint: str) -> bool:
|
|
"""Check if version matches a vulnerability constraint."""
|
|
if not version:
|
|
return True # Assume vulnerable if version unknown
|
|
|
|
# Simple version comparison
|
|
if constraint.startswith("< "):
|
|
target = constraint[2:]
|
|
try:
|
|
return self._compare_versions(version, target) < 0
|
|
except Exception:
|
|
return False
|
|
|
|
return False
|
|
|
|
def _compare_versions(self, v1: str, v2: str) -> int:
|
|
"""Compare two version strings. Returns -1, 0, or 1."""
|
|
|
|
def normalize(v):
|
|
return [int(x) for x in re.sub(r"[^0-9.]", "", v).split(".") if x]
|
|
|
|
try:
|
|
parts1 = normalize(v1)
|
|
parts2 = normalize(v2)
|
|
|
|
for i in range(max(len(parts1), len(parts2))):
|
|
p1 = parts1[i] if i < len(parts1) else 0
|
|
p2 = parts2[i] if i < len(parts2) else 0
|
|
if p1 < p2:
|
|
return -1
|
|
if p1 > p2:
|
|
return 1
|
|
return 0
|
|
except Exception:
|
|
return 0
|
|
|
|
def _run_external_scanners(self, owner: str, repo: str) -> list[DependencyFinding]:
|
|
"""Run external vulnerability scanners if available."""
|
|
findings = []
|
|
agent_config = self.config.get("agents", {}).get("dependency", {})
|
|
|
|
# Try pip-audit for Python
|
|
if agent_config.get("pip_audit", False):
|
|
try:
|
|
result = subprocess.run(
|
|
["pip-audit", "--format", "json"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=60,
|
|
)
|
|
if result.returncode == 0:
|
|
data = json.loads(result.stdout)
|
|
for vuln in data.get("vulnerabilities", []):
|
|
findings.append(
|
|
DependencyFinding(
|
|
package=vuln.get("name", ""),
|
|
version=vuln.get("version", ""),
|
|
severity=vuln.get("severity", "MEDIUM"),
|
|
vulnerability_id=vuln.get("id", ""),
|
|
title=vuln.get("description", "")[:100],
|
|
description=vuln.get("description", ""),
|
|
fixed_version=vuln.get("fix_versions", [None])[0],
|
|
)
|
|
)
|
|
except Exception as e:
|
|
self.logger.debug(f"pip-audit not available: {e}")
|
|
|
|
# Try npm audit for JavaScript
|
|
if agent_config.get("npm_audit", False):
|
|
try:
|
|
result = subprocess.run(
|
|
["npm", "audit", "--json"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=60,
|
|
)
|
|
data = json.loads(result.stdout)
|
|
for vuln_id, vuln in data.get("vulnerabilities", {}).items():
|
|
findings.append(
|
|
DependencyFinding(
|
|
package=vuln.get("name", vuln_id),
|
|
version=vuln.get("range", ""),
|
|
severity=vuln.get("severity", "moderate").upper(),
|
|
vulnerability_id=vuln_id,
|
|
title=vuln.get("title", ""),
|
|
description=vuln.get("overview", ""),
|
|
fixed_version=vuln.get("fixAvailable", {}).get("version"),
|
|
)
|
|
)
|
|
except Exception as e:
|
|
self.logger.debug(f"npm audit not available: {e}")
|
|
|
|
return findings
|
|
|
|
def _format_dependency_report(
|
|
self, report: DependencyReport, user: str | None = None
|
|
) -> str:
|
|
"""Format the dependency report as a comment."""
|
|
lines = []
|
|
|
|
if user:
|
|
lines.append(f"@{user}")
|
|
lines.append("")
|
|
|
|
lines.extend(
|
|
[
|
|
f"{self.AI_DISCLAIMER}",
|
|
"",
|
|
"## 🔍 Dependency Security Scan",
|
|
"",
|
|
"### Summary",
|
|
"",
|
|
f"| Metric | Value |",
|
|
f"|--------|-------|",
|
|
f"| Total Packages | {report.total_packages} |",
|
|
f"| Vulnerable | {report.vulnerable_packages} |",
|
|
f"| Outdated | {report.outdated_packages} |",
|
|
f"| Files Scanned | {len(report.files_scanned)} |",
|
|
"",
|
|
]
|
|
)
|
|
|
|
# Findings by severity
|
|
if report.findings:
|
|
lines.append("### 🚨 Security Findings")
|
|
lines.append("")
|
|
|
|
# Group by severity
|
|
by_severity = {"CRITICAL": [], "HIGH": [], "MEDIUM": [], "LOW": []}
|
|
for finding in report.findings:
|
|
sev = finding.severity.upper()
|
|
if sev in by_severity:
|
|
by_severity[sev].append(finding)
|
|
|
|
severity_emoji = {
|
|
"CRITICAL": "🔴",
|
|
"HIGH": "🟠",
|
|
"MEDIUM": "🟡",
|
|
"LOW": "🔵",
|
|
}
|
|
|
|
for severity in ["CRITICAL", "HIGH", "MEDIUM", "LOW"]:
|
|
findings = by_severity[severity]
|
|
if findings:
|
|
lines.append(f"#### {severity_emoji[severity]} {severity}")
|
|
lines.append("")
|
|
for f in findings[:10]: # Limit display
|
|
lines.append(f"- **{f.package}** `{f.version}`")
|
|
lines.append(f" - {f.vulnerability_id}: {f.title}")
|
|
if f.fixed_version:
|
|
lines.append(f" - ✅ Fix: Upgrade to `{f.fixed_version}`")
|
|
if len(findings) > 10:
|
|
lines.append(f" - ... and {len(findings) - 10} more")
|
|
lines.append("")
|
|
|
|
# Files scanned
|
|
lines.append("### 📁 Files Scanned")
|
|
lines.append("")
|
|
for f in report.files_scanned:
|
|
lines.append(f"- `{f}`")
|
|
lines.append("")
|
|
|
|
# Recommendations
|
|
if report.recommendations:
|
|
lines.append("### 💡 Recommendations")
|
|
lines.append("")
|
|
for rec in report.recommendations:
|
|
lines.append(f"- {rec}")
|
|
lines.append("")
|
|
|
|
# Overall status
|
|
if report.vulnerable_packages == 0:
|
|
lines.append("---")
|
|
lines.append("✅ **No known vulnerabilities detected**")
|
|
else:
|
|
lines.append("---")
|
|
lines.append(
|
|
f"⚠️ **{report.vulnerable_packages} vulnerable packages require attention**"
|
|
)
|
|
|
|
return "\n".join(lines)
|