All checks were successful
AI Codebase Quality Review / ai-codebase-review (push) Successful in 39s
432 lines
14 KiB
Python
432 lines
14 KiB
Python
"""SAST Scanner Integration
|
|
|
|
Integrates with external SAST tools like Bandit and Semgrep
|
|
to provide comprehensive security analysis.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class SASTFinding:
|
|
"""A finding from a SAST tool."""
|
|
|
|
tool: str
|
|
rule_id: str
|
|
severity: str # CRITICAL, HIGH, MEDIUM, LOW
|
|
file: str
|
|
line: int
|
|
message: str
|
|
code_snippet: str | None = None
|
|
cwe: str | None = None
|
|
owasp: str | None = None
|
|
fix_recommendation: str | None = None
|
|
|
|
|
|
@dataclass
|
|
class SASTReport:
|
|
"""Combined report from all SAST tools."""
|
|
|
|
total_findings: int
|
|
findings_by_severity: dict[str, int]
|
|
findings_by_tool: dict[str, int]
|
|
findings: list[SASTFinding]
|
|
tools_run: list[str]
|
|
errors: list[str] = field(default_factory=list)
|
|
|
|
|
|
class SASTScanner:
|
|
"""Aggregator for multiple SAST tools."""
|
|
|
|
def __init__(self, config: dict | None = None):
|
|
"""Initialize the SAST scanner.
|
|
|
|
Args:
|
|
config: Configuration dictionary with tool settings.
|
|
"""
|
|
self.config = config or {}
|
|
self.logger = logging.getLogger(self.__class__.__name__)
|
|
|
|
def scan_directory(self, path: str) -> SASTReport:
|
|
"""Scan a directory with all enabled SAST tools.
|
|
|
|
Args:
|
|
path: Path to the directory to scan.
|
|
|
|
Returns:
|
|
Combined SASTReport from all tools.
|
|
"""
|
|
all_findings = []
|
|
tools_run = []
|
|
errors = []
|
|
|
|
sast_config = self.config.get("security", {}).get("sast", {})
|
|
|
|
# Run Bandit (Python)
|
|
if sast_config.get("bandit", True):
|
|
if self._is_tool_available("bandit"):
|
|
try:
|
|
findings = self._run_bandit(path)
|
|
all_findings.extend(findings)
|
|
tools_run.append("bandit")
|
|
except Exception as e:
|
|
errors.append(f"Bandit error: {e}")
|
|
else:
|
|
self.logger.debug("Bandit not installed, skipping")
|
|
|
|
# Run Semgrep
|
|
if sast_config.get("semgrep", True):
|
|
if self._is_tool_available("semgrep"):
|
|
try:
|
|
findings = self._run_semgrep(path)
|
|
all_findings.extend(findings)
|
|
tools_run.append("semgrep")
|
|
except Exception as e:
|
|
errors.append(f"Semgrep error: {e}")
|
|
else:
|
|
self.logger.debug("Semgrep not installed, skipping")
|
|
|
|
# Run Trivy (if enabled for filesystem scanning)
|
|
if sast_config.get("trivy", False):
|
|
if self._is_tool_available("trivy"):
|
|
try:
|
|
findings = self._run_trivy(path)
|
|
all_findings.extend(findings)
|
|
tools_run.append("trivy")
|
|
except Exception as e:
|
|
errors.append(f"Trivy error: {e}")
|
|
else:
|
|
self.logger.debug("Trivy not installed, skipping")
|
|
|
|
# Calculate statistics
|
|
by_severity = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0}
|
|
by_tool = {}
|
|
|
|
for finding in all_findings:
|
|
sev = finding.severity.upper()
|
|
if sev in by_severity:
|
|
by_severity[sev] += 1
|
|
tool = finding.tool
|
|
by_tool[tool] = by_tool.get(tool, 0) + 1
|
|
|
|
return SASTReport(
|
|
total_findings=len(all_findings),
|
|
findings_by_severity=by_severity,
|
|
findings_by_tool=by_tool,
|
|
findings=all_findings,
|
|
tools_run=tools_run,
|
|
errors=errors,
|
|
)
|
|
|
|
def scan_content(self, content: str, filename: str) -> list[SASTFinding]:
|
|
"""Scan file content with SAST tools.
|
|
|
|
Args:
|
|
content: File content to scan.
|
|
filename: Name of the file (for language detection).
|
|
|
|
Returns:
|
|
List of SASTFinding objects.
|
|
"""
|
|
# Create temporary file for scanning
|
|
with tempfile.NamedTemporaryFile(
|
|
mode="w",
|
|
suffix=os.path.splitext(filename)[1],
|
|
delete=False,
|
|
) as f:
|
|
f.write(content)
|
|
temp_path = f.name
|
|
|
|
try:
|
|
report = self.scan_directory(os.path.dirname(temp_path))
|
|
# Filter findings for our specific file
|
|
findings = [
|
|
f
|
|
for f in report.findings
|
|
if os.path.basename(f.file) == os.path.basename(temp_path)
|
|
]
|
|
# Update file path to original filename
|
|
for finding in findings:
|
|
finding.file = filename
|
|
return findings
|
|
finally:
|
|
os.unlink(temp_path)
|
|
|
|
def scan_diff(self, diff: str) -> list[SASTFinding]:
|
|
"""Scan a diff for security issues.
|
|
|
|
Only scans added/modified lines.
|
|
|
|
Args:
|
|
diff: Git diff content.
|
|
|
|
Returns:
|
|
List of SASTFinding objects.
|
|
"""
|
|
findings = []
|
|
|
|
# Parse diff and extract added content per file
|
|
files_content = {}
|
|
current_file = None
|
|
current_content = []
|
|
|
|
for line in diff.splitlines():
|
|
if line.startswith("diff --git"):
|
|
if current_file and current_content:
|
|
files_content[current_file] = "\n".join(current_content)
|
|
current_file = None
|
|
current_content = []
|
|
# Extract filename
|
|
match = line.split(" b/")
|
|
if len(match) > 1:
|
|
current_file = match[1]
|
|
elif line.startswith("+") and not line.startswith("+++"):
|
|
if current_file:
|
|
current_content.append(line[1:]) # Remove + prefix
|
|
|
|
# Don't forget last file
|
|
if current_file and current_content:
|
|
files_content[current_file] = "\n".join(current_content)
|
|
|
|
# Scan each file's content
|
|
for filename, content in files_content.items():
|
|
if content.strip():
|
|
file_findings = self.scan_content(content, filename)
|
|
findings.extend(file_findings)
|
|
|
|
return findings
|
|
|
|
def _is_tool_available(self, tool: str) -> bool:
|
|
"""Check if a tool is installed and available."""
|
|
return shutil.which(tool) is not None
|
|
|
|
def _run_bandit(self, path: str) -> list[SASTFinding]:
|
|
"""Run Bandit security scanner.
|
|
|
|
Args:
|
|
path: Path to scan.
|
|
|
|
Returns:
|
|
List of SASTFinding objects.
|
|
"""
|
|
findings = []
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
"bandit",
|
|
"-r",
|
|
path,
|
|
"-f",
|
|
"json",
|
|
"-ll", # Only high and medium severity
|
|
"--quiet",
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=120,
|
|
)
|
|
|
|
if result.stdout:
|
|
data = json.loads(result.stdout)
|
|
|
|
for issue in data.get("results", []):
|
|
severity = issue.get("issue_severity", "MEDIUM").upper()
|
|
|
|
findings.append(
|
|
SASTFinding(
|
|
tool="bandit",
|
|
rule_id=issue.get("test_id", ""),
|
|
severity=severity,
|
|
file=issue.get("filename", ""),
|
|
line=issue.get("line_number", 0),
|
|
message=issue.get("issue_text", ""),
|
|
code_snippet=issue.get("code", ""),
|
|
cwe=f"CWE-{issue.get('issue_cwe', {}).get('id', '')}"
|
|
if issue.get("issue_cwe")
|
|
else None,
|
|
fix_recommendation=issue.get("more_info", ""),
|
|
)
|
|
)
|
|
|
|
except subprocess.TimeoutExpired:
|
|
self.logger.warning("Bandit scan timed out")
|
|
except json.JSONDecodeError as e:
|
|
self.logger.warning(f"Failed to parse Bandit output: {e}")
|
|
except Exception as e:
|
|
self.logger.warning(f"Bandit scan failed: {e}")
|
|
|
|
return findings
|
|
|
|
def _run_semgrep(self, path: str) -> list[SASTFinding]:
|
|
"""Run Semgrep security scanner.
|
|
|
|
Args:
|
|
path: Path to scan.
|
|
|
|
Returns:
|
|
List of SASTFinding objects.
|
|
"""
|
|
findings = []
|
|
|
|
# Get Semgrep config from settings
|
|
sast_config = self.config.get("security", {}).get("sast", {})
|
|
semgrep_rules = sast_config.get("semgrep_rules", "p/security-audit")
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
"semgrep",
|
|
"--config",
|
|
semgrep_rules,
|
|
"--json",
|
|
"--quiet",
|
|
path,
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=180,
|
|
)
|
|
|
|
if result.stdout:
|
|
data = json.loads(result.stdout)
|
|
|
|
for finding in data.get("results", []):
|
|
# Map Semgrep severity to our scale
|
|
sev_map = {
|
|
"ERROR": "HIGH",
|
|
"WARNING": "MEDIUM",
|
|
"INFO": "LOW",
|
|
}
|
|
severity = sev_map.get(
|
|
finding.get("extra", {}).get("severity", "WARNING"), "MEDIUM"
|
|
)
|
|
|
|
metadata = finding.get("extra", {}).get("metadata", {})
|
|
|
|
findings.append(
|
|
SASTFinding(
|
|
tool="semgrep",
|
|
rule_id=finding.get("check_id", ""),
|
|
severity=severity,
|
|
file=finding.get("path", ""),
|
|
line=finding.get("start", {}).get("line", 0),
|
|
message=finding.get("extra", {}).get("message", ""),
|
|
code_snippet=finding.get("extra", {}).get("lines", ""),
|
|
cwe=metadata.get("cwe", [None])[0]
|
|
if metadata.get("cwe")
|
|
else None,
|
|
owasp=metadata.get("owasp", [None])[0]
|
|
if metadata.get("owasp")
|
|
else None,
|
|
fix_recommendation=metadata.get("fix", ""),
|
|
)
|
|
)
|
|
|
|
except subprocess.TimeoutExpired:
|
|
self.logger.warning("Semgrep scan timed out")
|
|
except json.JSONDecodeError as e:
|
|
self.logger.warning(f"Failed to parse Semgrep output: {e}")
|
|
except Exception as e:
|
|
self.logger.warning(f"Semgrep scan failed: {e}")
|
|
|
|
return findings
|
|
|
|
def _run_trivy(self, path: str) -> list[SASTFinding]:
|
|
"""Run Trivy filesystem scanner.
|
|
|
|
Args:
|
|
path: Path to scan.
|
|
|
|
Returns:
|
|
List of SASTFinding objects.
|
|
"""
|
|
findings = []
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
"trivy",
|
|
"fs",
|
|
"--format",
|
|
"json",
|
|
"--security-checks",
|
|
"vuln,secret,config",
|
|
path,
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=180,
|
|
)
|
|
|
|
if result.stdout:
|
|
data = json.loads(result.stdout)
|
|
|
|
for result_item in data.get("Results", []):
|
|
target = result_item.get("Target", "")
|
|
|
|
# Process vulnerabilities
|
|
for vuln in result_item.get("Vulnerabilities", []):
|
|
severity = vuln.get("Severity", "MEDIUM").upper()
|
|
|
|
findings.append(
|
|
SASTFinding(
|
|
tool="trivy",
|
|
rule_id=vuln.get("VulnerabilityID", ""),
|
|
severity=severity,
|
|
file=target,
|
|
line=0,
|
|
message=vuln.get("Title", ""),
|
|
cwe=vuln.get("CweIDs", [None])[0]
|
|
if vuln.get("CweIDs")
|
|
else None,
|
|
fix_recommendation=f"Upgrade to {vuln.get('FixedVersion', 'latest')}"
|
|
if vuln.get("FixedVersion")
|
|
else None,
|
|
)
|
|
)
|
|
|
|
# Process secrets
|
|
for secret in result_item.get("Secrets", []):
|
|
findings.append(
|
|
SASTFinding(
|
|
tool="trivy",
|
|
rule_id=secret.get("RuleID", ""),
|
|
severity="HIGH",
|
|
file=target,
|
|
line=secret.get("StartLine", 0),
|
|
message=f"Secret detected: {secret.get('Title', '')}",
|
|
code_snippet=secret.get("Match", ""),
|
|
)
|
|
)
|
|
|
|
except subprocess.TimeoutExpired:
|
|
self.logger.warning("Trivy scan timed out")
|
|
except json.JSONDecodeError as e:
|
|
self.logger.warning(f"Failed to parse Trivy output: {e}")
|
|
except Exception as e:
|
|
self.logger.warning(f"Trivy scan failed: {e}")
|
|
|
|
return findings
|
|
|
|
|
|
def get_sast_scanner(config: dict | None = None) -> SASTScanner:
|
|
"""Get a configured SAST scanner instance.
|
|
|
|
Args:
|
|
config: Configuration dictionary.
|
|
|
|
Returns:
|
|
Configured SASTScanner instance.
|
|
"""
|
|
return SASTScanner(config=config)
|