security fixes
All checks were successful
Enterprise AI Code Review / ai-review (pull_request) Successful in 26s

This commit is contained in:
2025-12-28 19:55:05 +00:00
parent 4a3ddec68c
commit f94d21580c
15 changed files with 2549 additions and 46 deletions

0
tools/ai-review/security/__init__.py Normal file → Executable file
View File

View File

@@ -0,0 +1,172 @@
#!/usr/bin/env python3
"""Pre-commit hook for detecting hardcoded secrets.
Checks files for common secret patterns:
- API keys
- AWS credentials
- Private keys
- Passwords
- Tokens
"""
import re
import sys
from pathlib import Path
SECRET_PATTERNS = [
{
'name': 'OpenAI API Key',
"name": "OpenAI API Key",
"pattern": r"sk-[a-zA-Z0-9]{32,}",
"severity": "HIGH",
},
{
"name": "AWS Access Key",
"pattern": r"AKIA[0-9A-Z]{16}",
"severity": "HIGH",
},
{
"name": "Private Key",
"pattern": r"-----BEGIN[A-Z ]+PRIVATE KEY-----",
"severity": "HIGH",
},
{
"name": "Generic API Key",
"pattern": r'(?i)(api[_-]?key|apikey)\s*[:=]\s*["\']([a-zA-Z0-9_\-]{20,})["\']',
"severity": "HIGH",
},
{
"name": "Password in Code",
"pattern": r'(?i)password\s*[:=]\s*["\'](?!.*\{.*\})([^"\']{8,})["\']',
"severity": "HIGH",
},
{
"name": "Bearer Token",
"pattern": r"bearer\s+[a-zA-Z0-9_\-\.]{20,}",
"severity": "HIGH",
},
{
"name": "GitHub Token",
"pattern": r"gh[pousr]_[a-zA-Z0-9]{36,}",
"severity": "HIGH",
},
{
"name": "Slack Token",
"pattern": r"xox[baprs]-[a-zA-Z0-9-]{10,}",
"severity": "HIGH",
},
]
# Patterns to exclude (common false positives)
EXCLUDE_PATTERNS = [
r"example\.com",
r"your[_-]?api[_-]?key",
r"your[_-]?password",
r"<API[_-]?KEY>",
r"\[API[_-]?KEY\]",
r"\$\{", # Environment variable substitution
r"os\.environ", # Reading from env vars
r"secrets\.", # GitHub secrets
r"getenv", # Reading from env
]
def is_false_positive(line: str) -> bool:
"""Check if a line is likely a false positive."""
for pattern in EXCLUDE_PATTERNS:
if re.search(pattern, line, re.IGNORECASE):
return True
return False
def check_file_for_secrets(filepath: str) -> list[dict]:
"""Check a file for hardcoded secrets.
Args:
filepath: Path to file to check
Returns:
List of findings
"""
try:
with open(filepath, "r", encoding="utf-8") as f:
content = f.read()
except Exception:
return [] # Skip files we can't read
findings = []
lines = content.split("\n")
for i, line in enumerate(lines, start=1):
# Skip comments in common languages
stripped = line.strip()
if any(stripped.startswith(c) for c in ["#", "//", "/*", "*", "--"]):
continue
# Skip if line is a false positive
if is_false_positive(line):
continue
for pattern_info in SECRET_PATTERNS:
matches = re.finditer(pattern_info["pattern"], line)
for match in matches:
findings.append(
{
"name": pattern_info["name"],
"severity": pattern_info["severity"],
"line": i,
"match": match.group(0)[:50] + "..."
if len(match.group(0)) > 50
else match.group(0),
}
)
return findings
def main():
"""Run secret detection."""
files = sys.argv[1:]
if not files:
return 0
has_secrets = False
total_findings = 0
for filepath in files:
findings = check_file_for_secrets(filepath)
if not findings:
continue
total_findings += len(findings)
has_secrets = True
print(f"\n{'=' * 60}")
print(f"🔐 Potential secrets detected in: {filepath}")
print("=" * 60)
for finding in findings:
print(f"\n🔴 [{finding['severity']}] {finding['name']}")
print(f" Line: {finding['line']}")
print(f" Match: {finding['match']}")
if has_secrets:
print(f"\n{'=' * 60}")
print(f"Total potential secrets: {total_findings}")
print("=" * 60)
print("\n❌ COMMIT BLOCKED: Potential hardcoded secrets detected")
print("\nIf these are false positives:")
print(" 1. Use environment variables: os.environ.get('API_KEY')")
print(" 2. Use a secrets manager")
print(" 3. Add to .gitignore if it's a config file")
print("\nTo bypass (not recommended): git commit --no-verify")
return 1
return 0
if __name__ == "__main__":

View File

@@ -0,0 +1,83 @@
#!/usr/bin/env python3
"""Pre-commit hook for security scanning.
Scans staged files for security vulnerabilities before commit.
Fails if HIGH severity issues are found.
"""
import sys
from pathlib import Path
from security_scanner import SecurityScanner
def main():
"""Run security scan on staged files."""
scanner = SecurityScanner()
# Get files from command line (pre-commit passes them)
files = sys.argv[1:]
if not files:
print("No files to scan")
return 0
has_high_severity = False
total_findings = 0
for filepath in files:
try:
with open(filepath, "r", encoding="utf-8") as f:
content = f.read()
except Exception as e:
print(f"Warning: Could not read {filepath}: {e}")
continue
findings = list(scanner.scan_content(content, filepath))
if not findings:
continue
total_findings += len(findings)
# Print findings
print(f"\n{'=' * 60}")
print(f"Security findings in: {filepath}")
print("=" * 60)
for finding in findings:
severity_symbol = {
"HIGH": "🔴",
"MEDIUM": "🟡",
"LOW": "🔵",
}.get(finding.severity, "")
print(f"\n{severity_symbol} [{finding.severity}] {finding.name}")
print(f" Category: {finding.category}")
print(f" CWE: {finding.cwe}")
print(f" Line: {finding.line}")
print(f" Description: {finding.description}")
print(f" Recommendation: {finding.recommendation}")
if finding.severity == "HIGH":
has_high_severity = True
if total_findings > 0:
print(f"\n{'=' * 60}")
print(f"Total findings: {total_findings}")
print("=" * 60)
if has_high_severity:
print("\n❌ COMMIT BLOCKED: HIGH severity security issues found")
print("Please fix the issues above before committing.")
print("\nTo bypass (not recommended): git commit --no-verify")
return 1
if total_findings > 0:
print("\n⚠️ Medium/Low severity issues found - review recommended")
return 0
if __name__ == "__main__":
sys.exit(main())

0
tools/ai-review/security/security_scanner.py Normal file → Executable file
View File

View File

@@ -0,0 +1,157 @@
#!/usr/bin/env python3
"""Pre-commit hook for validating workflow files.
Checks workflow files for security anti-patterns:
- Full webhook data in environment variables
- Missing input validation
- Unsafe shell operations
"""
import re
import sys
from pathlib import Path
import yaml
SECURITY_CHECKS = [
{
'name': 'Full webhook data in env vars',
"name": "Full webhook data in env vars",
"pattern": r"toJSON\(github\.event\)|toJSON\(gitea\.event\)",
"severity": "HIGH",
"message": "Do not pass full webhook data to environment variables. Use minimal extraction instead.",
},
{
"name": "Unvalidated repository input",
"pattern": r"\$\{\{\s*(?:github|gitea)\.repository\s*\}\}",
"severity": "MEDIUM",
"message": "Repository name should be validated before use. Add format validation.",
"exclude_if": r"grep -qE.*repository", # OK if validation present
},
{
"name": "Direct user input in shell",
"pattern": r"\$\{\{\s*(?:github|gitea)\.event\.comment\.body\s*\}\}",
"severity": "MEDIUM",
"message": "Comment body should be properly escaped. Use jq -Rs for JSON escaping.",
"exclude_if": r"jq -Rs", # OK if using jq for escaping
},
{
"name": "Inline Python without validation",
"pattern": r"python -c.*json\.loads\(os\.environ",
"severity": "HIGH",
"message": "Use utils/safe_dispatch.py instead of inline Python with env vars.",
},
]
def check_workflow_file(filepath: str) -> list[dict]:
"""Check a workflow file for security issues.
Args:
filepath: Path to workflow YAML file
Returns:
List of findings
"""
try:
with open(filepath, "r", encoding="utf-8") as f:
content = f.read()
except Exception as e:
return [{"severity": "ERROR", "message": f"Could not read file: {e}"}]
# Parse YAML to ensure it's valid
try:
yaml.safe_load(content)
except yaml.YAMLError as e:
return [{"severity": "ERROR", "message": f"Invalid YAML: {e}"}]
findings = []
for check in SECURITY_CHECKS:
pattern = check["pattern"]
# Check if pattern is found
matches = re.finditer(pattern, content, re.MULTILINE)
for match in matches:
# If there's an exclusion pattern, check if it's present
if "exclude_if" in check:
if re.search(check["exclude_if"], content):
continue # Validation present, skip this finding
# Find line number
line_num = content[: match.start()].count("\n") + 1
findings.append(
{
"name": check["name"],
"severity": check["severity"],
"message": check["message"],
"line": line_num,
"match": match.group(0)[:80], # First 80 chars
}
)
return findings
def main():
"""Run workflow validation."""
files = sys.argv[1:]
if not files:
print("No workflow files to validate")
return 0
has_high_severity = False
total_findings = 0
for filepath in files:
findings = check_workflow_file(filepath)
if not findings:
continue
total_findings += len(findings)
print(f"\n{'=' * 60}")
print(f"Workflow security issues in: {filepath}")
print("=" * 60)
for finding in findings:
severity = finding.get("severity", "UNKNOWN")
severity_symbol = {
"HIGH": "🔴",
"MEDIUM": "🟡",
"LOW": "🔵",
"ERROR": "",
}.get(severity, "")
print(f"\n{severity_symbol} [{severity}] {finding.get('name', 'Issue')}")
print(f" Line: {finding.get('line', 'N/A')}")
print(f" {finding['message']}")
if "match" in finding:
print(f" Match: {finding['match']}")
if severity == "HIGH" or severity == "ERROR":
has_high_severity = True
if total_findings > 0:
print(f"\n{'=' * 60}")
print(f"Total findings: {total_findings}")
print("=" * 60)
if has_high_severity:
print("\n❌ COMMIT BLOCKED: Critical workflow security issues found")
print("Please fix the issues above before committing.")
print("\nSee SECURITY.md for workflow security best practices.")
return 1
if total_findings > 0:
print("\n⚠️ Medium severity issues found - review recommended")
return 0
if __name__ == "__main__":

View File

@@ -0,0 +1,174 @@
#!/usr/bin/env python3
"""Safe Event Dispatcher for Workflow Integration
This module provides a secure wrapper for dispatching webhook events from
CI/CD workflows. It validates inputs, sanitizes data, and prevents common
security issues.
Usage:
python safe_dispatch.py issue_comment owner/repo '{"action": "created", ...}'
Security Features:
- Input validation and sanitization
- Repository format validation
- Event data size limits
- No direct environment variable exposure
- Comprehensive error handling
"""
import json
import logging
import os
import sys
from typing import NoReturn
# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from agents.chat_agent import ChatAgent
from agents.codebase_agent import CodebaseAgent
from agents.issue_agent import IssueAgent
from agents.pr_agent import PRAgent
from dispatcher import get_dispatcher
from utils.webhook_sanitizer import (
extract_minimal_context,
sanitize_webhook_data,
validate_repository_format,
)
# Maximum event data size (10MB)
MAX_EVENT_SIZE = 10 * 1024 * 1024
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
def setup_dispatcher():
"""Initialize dispatcher with all agents."""
dispatcher = get_dispatcher()
# Register all agents
dispatcher.register_agent(PRAgent())
dispatcher.register_agent(IssueAgent())
dispatcher.register_agent(ChatAgent())
dispatcher.register_agent(CodebaseAgent())
return dispatcher
def load_event_data(event_json: str) -> dict:
"""Load and validate event data.
Args:
event_json: JSON string containing event data
Returns:
Parsed and validated event data
Raises:
ValueError: If data is invalid
"""
# Check size before parsing
if len(event_json) > MAX_EVENT_SIZE:
raise ValueError(
f"Event data too large: {len(event_json)} bytes (max: {MAX_EVENT_SIZE})"
)
try:
data = json.loads(event_json)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON: {e}") from e
if not isinstance(data, dict):
raise ValueError("Event data must be a JSON object")
return data
def safe_dispatch(event_type: str, repository: str, event_json: str) -> int:
"""Safely dispatch a webhook event.
Args:
event_type: Type of event (issue_comment, pull_request, etc.)
repository: Repository in format "owner/repo"
event_json: JSON string containing event data
Returns:
Exit code (0 for success, 1 for error)
"""
try:
# Validate repository format
owner, repo = validate_repository_format(repository)
logger.info(f"Dispatching {event_type} for {owner}/{repo}")
# Load and validate event data
event_data = load_event_data(event_json)
# Sanitize event data to remove sensitive fields
sanitized_data = sanitize_webhook_data(event_data)
# Extract minimal context (reduces attack surface)
minimal_data = extract_minimal_context(event_type, sanitized_data)
# Log sanitized version
logger.debug(f"Event data: {json.dumps(minimal_data, indent=2)[:500]}...")
# Initialize dispatcher
dispatcher = setup_dispatcher()
# Dispatch event with sanitized data
# Note: Agents will fetch full data from API if needed
result = dispatcher.dispatch(
event_type=event_type,
event_data=minimal_data,
owner=owner,
repo=repo,
)
# Log results
logger.info(f"Agents run: {result.agents_run}")
for i, agent_result in enumerate(result.results):
status = "" if agent_result.success else ""
agent_name = result.agents_run[i]
logger.info(f" {status} {agent_name}: {agent_result.message}")
# Return error code if any agents failed
if result.errors:
logger.error("Errors occurred during dispatch:")
for error in result.errors:
logger.error(f" - {error}")
return 1
return 0
except ValueError as e:
logger.error(f"Validation error: {e}")
return 1
except Exception as e:
logger.exception(f"Unexpected error during dispatch: {e}")
return 1
def main() -> NoReturn:
"""Main entry point."""
if len(sys.argv) != 4:
print("Usage: safe_dispatch.py <event_type> <owner/repo> <event_json>")
print()
print("Example:")
print(
' safe_dispatch.py issue_comment owner/repo \'{"action": "created", ...}\''
)
sys.exit(1)
event_type = sys.argv[1]
repository = sys.argv[2]
event_json = sys.argv[3]
exit_code = safe_dispatch(event_type, repository, event_json)
sys.exit(exit_code)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,252 @@
"""Webhook Data Sanitization Utilities
This module provides utilities to sanitize webhook event data before
passing it to agents or storing it in environment variables. This helps
prevent sensitive information exposure in logs and environment dumps.
Security Features:
- Removes sensitive fields from webhook payloads
- Validates input structure
- Provides logging-safe versions of data
"""
import copy
import logging
from typing import Any
logger = logging.getLogger(__name__)
# Fields that should be removed from webhook data when stored in environment
SENSITIVE_FIELDS = {
# User data
"email",
"private_email",
"email_addresses",
# Authentication & tokens
"token",
"access_token",
"refresh_token",
"api_key",
"secret",
"password",
"private_key",
"ssh_key",
# Personal info
"phone",
"phone_number",
"address",
"ssn",
"credit_card",
# Internal identifiers that might be sensitive
"installation_id",
"node_id",
}
# Fields to keep only minimal info (redact most content)
REDACT_FIELDS = {
"body": 500, # Keep first 500 chars only
"description": 500,
"message": 500,
}
def sanitize_webhook_data(data: dict, max_depth: int = 10) -> dict:
"""Sanitize webhook data by removing sensitive fields.
This function removes sensitive fields and truncates large text fields
to prevent accidental exposure in logs or environment variables.
Args:
data: Webhook event data to sanitize
max_depth: Maximum recursion depth (prevents infinite loops)
Returns:
Sanitized copy of the data
Example:
>>> event = {"issue": {"body": "..." * 1000, "user": {"email": "secret@example.com"}}}
>>> clean = sanitize_webhook_data(event)
>>> "email" in str(clean)
False
"""
if max_depth <= 0:
logger.warning("Max recursion depth reached during sanitization")
return {}
if not isinstance(data, dict):
return data
sanitized = {}
for key, value in data.items():
# Skip sensitive fields entirely
if key.lower() in SENSITIVE_FIELDS:
sanitized[key] = "[REDACTED]"
continue
# Truncate large text fields
if key in REDACT_FIELDS and isinstance(value, str):
max_len = REDACT_FIELDS[key]
if len(value) > max_len:
sanitized[key] = value[:max_len] + "... [TRUNCATED]"
else:
sanitized[key] = value
continue
# Recursively sanitize nested dicts
if isinstance(value, dict):
sanitized[key] = sanitize_webhook_data(value, max_depth - 1)
elif isinstance(value, list):
sanitized[key] = [
sanitize_webhook_data(item, max_depth - 1)
if isinstance(item, dict)
else item
for item in value
]
else:
sanitized[key] = value
return sanitized
def extract_minimal_context(event_type: str, event_data: dict) -> dict:
"""Extract only the minimal necessary data for workflow dispatch.
This creates a minimal payload with just the essential fields needed
for agent dispatch, reducing the attack surface.
Args:
event_type: Type of webhook event
event_data: Full webhook payload
Returns:
Minimal safe payload
"""
minimal = {
"action": event_data.get("action"),
}
if event_type == "issue_comment":
issue = event_data.get("issue", {})
comment = event_data.get("comment", {})
minimal["issue"] = {
"number": issue.get("number"),
"title": issue.get("title", "")[:200], # Truncate title
"state": issue.get("state"),
"pull_request": issue.get(
"pull_request"
), # Just the reference, not full data
"labels": [
{"name": label.get("name")} for label in issue.get("labels", [])
],
}
minimal["comment"] = {
"id": comment.get("id"),
"body": comment.get("body", "")[:2000], # Truncate to 2KB
"user": {
"login": comment.get("user", {}).get("login"),
},
}
elif event_type == "pull_request":
pr = event_data.get("pull_request", {})
minimal["pull_request"] = {
"number": pr.get("number"),
"title": pr.get("title", "")[:200],
"state": pr.get("state"),
"head": {
"ref": pr.get("head", {}).get("ref"),
"sha": pr.get("head", {}).get("sha"),
},
"base": {
"ref": pr.get("base", {}).get("ref"),
"sha": pr.get("base", {}).get("sha"),
},
}
elif event_type == "issues":
issue = event_data.get("issue", {})
minimal["issue"] = {
"number": issue.get("number"),
"title": issue.get("title", "")[:200],
"state": issue.get("state"),
"labels": [
{"name": label.get("name")} for label in issue.get("labels", [])
],
}
return minimal
def validate_repository_format(repo: str) -> tuple[str, str]:
"""Validate and parse repository string.
Args:
repo: Repository in format "owner/repo"
Returns:
Tuple of (owner, repo_name)
Raises:
ValueError: If format is invalid
"""
if not repo or not isinstance(repo, str):
raise ValueError("Repository must be a non-empty string")
parts = repo.split("/")
if len(parts) != 2:
raise ValueError(f"Invalid repository format: '{repo}'. Expected 'owner/repo'")
owner, repo_name = parts
# Validate owner and repo name (basic alphanumeric + dash/underscore)
if not owner or not repo_name:
raise ValueError("Owner and repository name cannot be empty")
# Check for path traversal attempts
if ".." in owner or ".." in repo_name:
raise ValueError("Path traversal detected in repository name")
# Check for shell injection attempts
dangerous_chars = [";", "|", "&", "$", "`", "(", ")", "{", "}", "[", "]", "<", ">"]
for char in dangerous_chars:
if char in owner or char in repo_name:
raise ValueError(f"Invalid character '{char}' in repository name")
return owner, repo_name
def validate_webhook_signature(payload: str, signature: str, secret: str) -> bool:
"""Validate webhook signature (for future GitHub webhook integration).
Args:
payload: Raw webhook payload
signature: Signature from webhook header
secret: Webhook secret
Returns:
True if signature is valid
"""
import hmac
import hashlib
if not secret or not signature:
return False
# GitHub uses sha256=<signature> or sha1=<signature>
if signature.startswith("sha256="):
hash_func = hashlib.sha256
signature = signature[7:]
elif signature.startswith("sha1="):
hash_func = hashlib.sha1
signature = signature[5:]
else:
return False
expected = hmac.new(secret.encode(), payload.encode(), hash_func).hexdigest()
return hmac.compare_digest(expected, signature)