Files
openrabbit/tools/ai-review/security/sast_scanner.py
latte e8d28225e0
All checks were successful
AI Codebase Quality Review / ai-codebase-review (push) Successful in 39s
just why not
2026-01-07 21:19:46 +01:00

432 lines
14 KiB
Python

"""SAST Scanner Integration
Integrates with external SAST tools like Bandit and Semgrep
to provide comprehensive security analysis.
"""
import json
import logging
import os
import shutil
import subprocess
import tempfile
from dataclasses import dataclass, field
from typing import Any
logger = logging.getLogger(__name__)
@dataclass
class SASTFinding:
"""A finding from a SAST tool."""
tool: str
rule_id: str
severity: str # CRITICAL, HIGH, MEDIUM, LOW
file: str
line: int
message: str
code_snippet: str | None = None
cwe: str | None = None
owasp: str | None = None
fix_recommendation: str | None = None
@dataclass
class SASTReport:
"""Combined report from all SAST tools."""
total_findings: int
findings_by_severity: dict[str, int]
findings_by_tool: dict[str, int]
findings: list[SASTFinding]
tools_run: list[str]
errors: list[str] = field(default_factory=list)
class SASTScanner:
"""Aggregator for multiple SAST tools."""
def __init__(self, config: dict | None = None):
"""Initialize the SAST scanner.
Args:
config: Configuration dictionary with tool settings.
"""
self.config = config or {}
self.logger = logging.getLogger(self.__class__.__name__)
def scan_directory(self, path: str) -> SASTReport:
"""Scan a directory with all enabled SAST tools.
Args:
path: Path to the directory to scan.
Returns:
Combined SASTReport from all tools.
"""
all_findings = []
tools_run = []
errors = []
sast_config = self.config.get("security", {}).get("sast", {})
# Run Bandit (Python)
if sast_config.get("bandit", True):
if self._is_tool_available("bandit"):
try:
findings = self._run_bandit(path)
all_findings.extend(findings)
tools_run.append("bandit")
except Exception as e:
errors.append(f"Bandit error: {e}")
else:
self.logger.debug("Bandit not installed, skipping")
# Run Semgrep
if sast_config.get("semgrep", True):
if self._is_tool_available("semgrep"):
try:
findings = self._run_semgrep(path)
all_findings.extend(findings)
tools_run.append("semgrep")
except Exception as e:
errors.append(f"Semgrep error: {e}")
else:
self.logger.debug("Semgrep not installed, skipping")
# Run Trivy (if enabled for filesystem scanning)
if sast_config.get("trivy", False):
if self._is_tool_available("trivy"):
try:
findings = self._run_trivy(path)
all_findings.extend(findings)
tools_run.append("trivy")
except Exception as e:
errors.append(f"Trivy error: {e}")
else:
self.logger.debug("Trivy not installed, skipping")
# Calculate statistics
by_severity = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0}
by_tool = {}
for finding in all_findings:
sev = finding.severity.upper()
if sev in by_severity:
by_severity[sev] += 1
tool = finding.tool
by_tool[tool] = by_tool.get(tool, 0) + 1
return SASTReport(
total_findings=len(all_findings),
findings_by_severity=by_severity,
findings_by_tool=by_tool,
findings=all_findings,
tools_run=tools_run,
errors=errors,
)
def scan_content(self, content: str, filename: str) -> list[SASTFinding]:
"""Scan file content with SAST tools.
Args:
content: File content to scan.
filename: Name of the file (for language detection).
Returns:
List of SASTFinding objects.
"""
# Create temporary file for scanning
with tempfile.NamedTemporaryFile(
mode="w",
suffix=os.path.splitext(filename)[1],
delete=False,
) as f:
f.write(content)
temp_path = f.name
try:
report = self.scan_directory(os.path.dirname(temp_path))
# Filter findings for our specific file
findings = [
f
for f in report.findings
if os.path.basename(f.file) == os.path.basename(temp_path)
]
# Update file path to original filename
for finding in findings:
finding.file = filename
return findings
finally:
os.unlink(temp_path)
def scan_diff(self, diff: str) -> list[SASTFinding]:
"""Scan a diff for security issues.
Only scans added/modified lines.
Args:
diff: Git diff content.
Returns:
List of SASTFinding objects.
"""
findings = []
# Parse diff and extract added content per file
files_content = {}
current_file = None
current_content = []
for line in diff.splitlines():
if line.startswith("diff --git"):
if current_file and current_content:
files_content[current_file] = "\n".join(current_content)
current_file = None
current_content = []
# Extract filename
match = line.split(" b/")
if len(match) > 1:
current_file = match[1]
elif line.startswith("+") and not line.startswith("+++"):
if current_file:
current_content.append(line[1:]) # Remove + prefix
# Don't forget last file
if current_file and current_content:
files_content[current_file] = "\n".join(current_content)
# Scan each file's content
for filename, content in files_content.items():
if content.strip():
file_findings = self.scan_content(content, filename)
findings.extend(file_findings)
return findings
def _is_tool_available(self, tool: str) -> bool:
"""Check if a tool is installed and available."""
return shutil.which(tool) is not None
def _run_bandit(self, path: str) -> list[SASTFinding]:
"""Run Bandit security scanner.
Args:
path: Path to scan.
Returns:
List of SASTFinding objects.
"""
findings = []
try:
result = subprocess.run(
[
"bandit",
"-r",
path,
"-f",
"json",
"-ll", # Only high and medium severity
"--quiet",
],
capture_output=True,
text=True,
timeout=120,
)
if result.stdout:
data = json.loads(result.stdout)
for issue in data.get("results", []):
severity = issue.get("issue_severity", "MEDIUM").upper()
findings.append(
SASTFinding(
tool="bandit",
rule_id=issue.get("test_id", ""),
severity=severity,
file=issue.get("filename", ""),
line=issue.get("line_number", 0),
message=issue.get("issue_text", ""),
code_snippet=issue.get("code", ""),
cwe=f"CWE-{issue.get('issue_cwe', {}).get('id', '')}"
if issue.get("issue_cwe")
else None,
fix_recommendation=issue.get("more_info", ""),
)
)
except subprocess.TimeoutExpired:
self.logger.warning("Bandit scan timed out")
except json.JSONDecodeError as e:
self.logger.warning(f"Failed to parse Bandit output: {e}")
except Exception as e:
self.logger.warning(f"Bandit scan failed: {e}")
return findings
def _run_semgrep(self, path: str) -> list[SASTFinding]:
"""Run Semgrep security scanner.
Args:
path: Path to scan.
Returns:
List of SASTFinding objects.
"""
findings = []
# Get Semgrep config from settings
sast_config = self.config.get("security", {}).get("sast", {})
semgrep_rules = sast_config.get("semgrep_rules", "p/security-audit")
try:
result = subprocess.run(
[
"semgrep",
"--config",
semgrep_rules,
"--json",
"--quiet",
path,
],
capture_output=True,
text=True,
timeout=180,
)
if result.stdout:
data = json.loads(result.stdout)
for finding in data.get("results", []):
# Map Semgrep severity to our scale
sev_map = {
"ERROR": "HIGH",
"WARNING": "MEDIUM",
"INFO": "LOW",
}
severity = sev_map.get(
finding.get("extra", {}).get("severity", "WARNING"), "MEDIUM"
)
metadata = finding.get("extra", {}).get("metadata", {})
findings.append(
SASTFinding(
tool="semgrep",
rule_id=finding.get("check_id", ""),
severity=severity,
file=finding.get("path", ""),
line=finding.get("start", {}).get("line", 0),
message=finding.get("extra", {}).get("message", ""),
code_snippet=finding.get("extra", {}).get("lines", ""),
cwe=metadata.get("cwe", [None])[0]
if metadata.get("cwe")
else None,
owasp=metadata.get("owasp", [None])[0]
if metadata.get("owasp")
else None,
fix_recommendation=metadata.get("fix", ""),
)
)
except subprocess.TimeoutExpired:
self.logger.warning("Semgrep scan timed out")
except json.JSONDecodeError as e:
self.logger.warning(f"Failed to parse Semgrep output: {e}")
except Exception as e:
self.logger.warning(f"Semgrep scan failed: {e}")
return findings
def _run_trivy(self, path: str) -> list[SASTFinding]:
"""Run Trivy filesystem scanner.
Args:
path: Path to scan.
Returns:
List of SASTFinding objects.
"""
findings = []
try:
result = subprocess.run(
[
"trivy",
"fs",
"--format",
"json",
"--security-checks",
"vuln,secret,config",
path,
],
capture_output=True,
text=True,
timeout=180,
)
if result.stdout:
data = json.loads(result.stdout)
for result_item in data.get("Results", []):
target = result_item.get("Target", "")
# Process vulnerabilities
for vuln in result_item.get("Vulnerabilities", []):
severity = vuln.get("Severity", "MEDIUM").upper()
findings.append(
SASTFinding(
tool="trivy",
rule_id=vuln.get("VulnerabilityID", ""),
severity=severity,
file=target,
line=0,
message=vuln.get("Title", ""),
cwe=vuln.get("CweIDs", [None])[0]
if vuln.get("CweIDs")
else None,
fix_recommendation=f"Upgrade to {vuln.get('FixedVersion', 'latest')}"
if vuln.get("FixedVersion")
else None,
)
)
# Process secrets
for secret in result_item.get("Secrets", []):
findings.append(
SASTFinding(
tool="trivy",
rule_id=secret.get("RuleID", ""),
severity="HIGH",
file=target,
line=secret.get("StartLine", 0),
message=f"Secret detected: {secret.get('Title', '')}",
code_snippet=secret.get("Match", ""),
)
)
except subprocess.TimeoutExpired:
self.logger.warning("Trivy scan timed out")
except json.JSONDecodeError as e:
self.logger.warning(f"Failed to parse Trivy output: {e}")
except Exception as e:
self.logger.warning(f"Trivy scan failed: {e}")
return findings
def get_sast_scanner(config: dict | None = None) -> SASTScanner:
"""Get a configured SAST scanner instance.
Args:
config: Configuration dictionary.
Returns:
Configured SASTScanner instance.
"""
return SASTScanner(config=config)