Files
openrabbit/tools/ai-review/utils/webhook_sanitizer.py
latte f94d21580c
All checks were successful
Enterprise AI Code Review / ai-review (pull_request) Successful in 26s
security fixes
2025-12-28 19:55:05 +00:00

253 lines
7.2 KiB
Python

"""Webhook Data Sanitization Utilities
This module provides utilities to sanitize webhook event data before
passing it to agents or storing it in environment variables. This helps
prevent sensitive information exposure in logs and environment dumps.
Security Features:
- Removes sensitive fields from webhook payloads
- Validates input structure
- Provides logging-safe versions of data
"""
import copy
import logging
from typing import Any
logger = logging.getLogger(__name__)
# Fields that should be removed from webhook data when stored in environment
SENSITIVE_FIELDS = {
# User data
"email",
"private_email",
"email_addresses",
# Authentication & tokens
"token",
"access_token",
"refresh_token",
"api_key",
"secret",
"password",
"private_key",
"ssh_key",
# Personal info
"phone",
"phone_number",
"address",
"ssn",
"credit_card",
# Internal identifiers that might be sensitive
"installation_id",
"node_id",
}
# Fields to keep only minimal info (redact most content)
REDACT_FIELDS = {
"body": 500, # Keep first 500 chars only
"description": 500,
"message": 500,
}
def sanitize_webhook_data(data: dict, max_depth: int = 10) -> dict:
"""Sanitize webhook data by removing sensitive fields.
This function removes sensitive fields and truncates large text fields
to prevent accidental exposure in logs or environment variables.
Args:
data: Webhook event data to sanitize
max_depth: Maximum recursion depth (prevents infinite loops)
Returns:
Sanitized copy of the data
Example:
>>> event = {"issue": {"body": "..." * 1000, "user": {"email": "secret@example.com"}}}
>>> clean = sanitize_webhook_data(event)
>>> "email" in str(clean)
False
"""
if max_depth <= 0:
logger.warning("Max recursion depth reached during sanitization")
return {}
if not isinstance(data, dict):
return data
sanitized = {}
for key, value in data.items():
# Skip sensitive fields entirely
if key.lower() in SENSITIVE_FIELDS:
sanitized[key] = "[REDACTED]"
continue
# Truncate large text fields
if key in REDACT_FIELDS and isinstance(value, str):
max_len = REDACT_FIELDS[key]
if len(value) > max_len:
sanitized[key] = value[:max_len] + "... [TRUNCATED]"
else:
sanitized[key] = value
continue
# Recursively sanitize nested dicts
if isinstance(value, dict):
sanitized[key] = sanitize_webhook_data(value, max_depth - 1)
elif isinstance(value, list):
sanitized[key] = [
sanitize_webhook_data(item, max_depth - 1)
if isinstance(item, dict)
else item
for item in value
]
else:
sanitized[key] = value
return sanitized
def extract_minimal_context(event_type: str, event_data: dict) -> dict:
"""Extract only the minimal necessary data for workflow dispatch.
This creates a minimal payload with just the essential fields needed
for agent dispatch, reducing the attack surface.
Args:
event_type: Type of webhook event
event_data: Full webhook payload
Returns:
Minimal safe payload
"""
minimal = {
"action": event_data.get("action"),
}
if event_type == "issue_comment":
issue = event_data.get("issue", {})
comment = event_data.get("comment", {})
minimal["issue"] = {
"number": issue.get("number"),
"title": issue.get("title", "")[:200], # Truncate title
"state": issue.get("state"),
"pull_request": issue.get(
"pull_request"
), # Just the reference, not full data
"labels": [
{"name": label.get("name")} for label in issue.get("labels", [])
],
}
minimal["comment"] = {
"id": comment.get("id"),
"body": comment.get("body", "")[:2000], # Truncate to 2KB
"user": {
"login": comment.get("user", {}).get("login"),
},
}
elif event_type == "pull_request":
pr = event_data.get("pull_request", {})
minimal["pull_request"] = {
"number": pr.get("number"),
"title": pr.get("title", "")[:200],
"state": pr.get("state"),
"head": {
"ref": pr.get("head", {}).get("ref"),
"sha": pr.get("head", {}).get("sha"),
},
"base": {
"ref": pr.get("base", {}).get("ref"),
"sha": pr.get("base", {}).get("sha"),
},
}
elif event_type == "issues":
issue = event_data.get("issue", {})
minimal["issue"] = {
"number": issue.get("number"),
"title": issue.get("title", "")[:200],
"state": issue.get("state"),
"labels": [
{"name": label.get("name")} for label in issue.get("labels", [])
],
}
return minimal
def validate_repository_format(repo: str) -> tuple[str, str]:
"""Validate and parse repository string.
Args:
repo: Repository in format "owner/repo"
Returns:
Tuple of (owner, repo_name)
Raises:
ValueError: If format is invalid
"""
if not repo or not isinstance(repo, str):
raise ValueError("Repository must be a non-empty string")
parts = repo.split("/")
if len(parts) != 2:
raise ValueError(f"Invalid repository format: '{repo}'. Expected 'owner/repo'")
owner, repo_name = parts
# Validate owner and repo name (basic alphanumeric + dash/underscore)
if not owner or not repo_name:
raise ValueError("Owner and repository name cannot be empty")
# Check for path traversal attempts
if ".." in owner or ".." in repo_name:
raise ValueError("Path traversal detected in repository name")
# Check for shell injection attempts
dangerous_chars = [";", "|", "&", "$", "`", "(", ")", "{", "}", "[", "]", "<", ">"]
for char in dangerous_chars:
if char in owner or char in repo_name:
raise ValueError(f"Invalid character '{char}' in repository name")
return owner, repo_name
def validate_webhook_signature(payload: str, signature: str, secret: str) -> bool:
"""Validate webhook signature (for future GitHub webhook integration).
Args:
payload: Raw webhook payload
signature: Signature from webhook header
secret: Webhook secret
Returns:
True if signature is valid
"""
import hmac
import hashlib
if not secret or not signature:
return False
# GitHub uses sha256=<signature> or sha1=<signature>
if signature.startswith("sha256="):
hash_func = hashlib.sha256
signature = signature[7:]
elif signature.startswith("sha1="):
hash_func = hashlib.sha1
signature = signature[5:]
else:
return False
expected = hmac.new(secret.encode(), payload.encode(), hash_func).hexdigest()
return hmac.compare_digest(expected, signature)