All checks were successful
Enterprise AI Code Review / ai-review (pull_request) Successful in 26s
253 lines
7.2 KiB
Python
253 lines
7.2 KiB
Python
"""Webhook Data Sanitization Utilities
|
|
|
|
This module provides utilities to sanitize webhook event data before
|
|
passing it to agents or storing it in environment variables. This helps
|
|
prevent sensitive information exposure in logs and environment dumps.
|
|
|
|
Security Features:
|
|
- Removes sensitive fields from webhook payloads
|
|
- Validates input structure
|
|
- Provides logging-safe versions of data
|
|
"""
|
|
|
|
import copy
|
|
import logging
|
|
from typing import Any
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# Fields that should be removed from webhook data when stored in environment
|
|
SENSITIVE_FIELDS = {
|
|
# User data
|
|
"email",
|
|
"private_email",
|
|
"email_addresses",
|
|
# Authentication & tokens
|
|
"token",
|
|
"access_token",
|
|
"refresh_token",
|
|
"api_key",
|
|
"secret",
|
|
"password",
|
|
"private_key",
|
|
"ssh_key",
|
|
# Personal info
|
|
"phone",
|
|
"phone_number",
|
|
"address",
|
|
"ssn",
|
|
"credit_card",
|
|
# Internal identifiers that might be sensitive
|
|
"installation_id",
|
|
"node_id",
|
|
}
|
|
|
|
|
|
# Fields to keep only minimal info (redact most content)
|
|
REDACT_FIELDS = {
|
|
"body": 500, # Keep first 500 chars only
|
|
"description": 500,
|
|
"message": 500,
|
|
}
|
|
|
|
|
|
def sanitize_webhook_data(data: dict, max_depth: int = 10) -> dict:
|
|
"""Sanitize webhook data by removing sensitive fields.
|
|
|
|
This function removes sensitive fields and truncates large text fields
|
|
to prevent accidental exposure in logs or environment variables.
|
|
|
|
Args:
|
|
data: Webhook event data to sanitize
|
|
max_depth: Maximum recursion depth (prevents infinite loops)
|
|
|
|
Returns:
|
|
Sanitized copy of the data
|
|
|
|
Example:
|
|
>>> event = {"issue": {"body": "..." * 1000, "user": {"email": "secret@example.com"}}}
|
|
>>> clean = sanitize_webhook_data(event)
|
|
>>> "email" in str(clean)
|
|
False
|
|
"""
|
|
if max_depth <= 0:
|
|
logger.warning("Max recursion depth reached during sanitization")
|
|
return {}
|
|
|
|
if not isinstance(data, dict):
|
|
return data
|
|
|
|
sanitized = {}
|
|
|
|
for key, value in data.items():
|
|
# Skip sensitive fields entirely
|
|
if key.lower() in SENSITIVE_FIELDS:
|
|
sanitized[key] = "[REDACTED]"
|
|
continue
|
|
|
|
# Truncate large text fields
|
|
if key in REDACT_FIELDS and isinstance(value, str):
|
|
max_len = REDACT_FIELDS[key]
|
|
if len(value) > max_len:
|
|
sanitized[key] = value[:max_len] + "... [TRUNCATED]"
|
|
else:
|
|
sanitized[key] = value
|
|
continue
|
|
|
|
# Recursively sanitize nested dicts
|
|
if isinstance(value, dict):
|
|
sanitized[key] = sanitize_webhook_data(value, max_depth - 1)
|
|
elif isinstance(value, list):
|
|
sanitized[key] = [
|
|
sanitize_webhook_data(item, max_depth - 1)
|
|
if isinstance(item, dict)
|
|
else item
|
|
for item in value
|
|
]
|
|
else:
|
|
sanitized[key] = value
|
|
|
|
return sanitized
|
|
|
|
|
|
def extract_minimal_context(event_type: str, event_data: dict) -> dict:
|
|
"""Extract only the minimal necessary data for workflow dispatch.
|
|
|
|
This creates a minimal payload with just the essential fields needed
|
|
for agent dispatch, reducing the attack surface.
|
|
|
|
Args:
|
|
event_type: Type of webhook event
|
|
event_data: Full webhook payload
|
|
|
|
Returns:
|
|
Minimal safe payload
|
|
"""
|
|
minimal = {
|
|
"action": event_data.get("action"),
|
|
}
|
|
|
|
if event_type == "issue_comment":
|
|
issue = event_data.get("issue", {})
|
|
comment = event_data.get("comment", {})
|
|
|
|
minimal["issue"] = {
|
|
"number": issue.get("number"),
|
|
"title": issue.get("title", "")[:200], # Truncate title
|
|
"state": issue.get("state"),
|
|
"pull_request": issue.get(
|
|
"pull_request"
|
|
), # Just the reference, not full data
|
|
"labels": [
|
|
{"name": label.get("name")} for label in issue.get("labels", [])
|
|
],
|
|
}
|
|
|
|
minimal["comment"] = {
|
|
"id": comment.get("id"),
|
|
"body": comment.get("body", "")[:2000], # Truncate to 2KB
|
|
"user": {
|
|
"login": comment.get("user", {}).get("login"),
|
|
},
|
|
}
|
|
|
|
elif event_type == "pull_request":
|
|
pr = event_data.get("pull_request", {})
|
|
minimal["pull_request"] = {
|
|
"number": pr.get("number"),
|
|
"title": pr.get("title", "")[:200],
|
|
"state": pr.get("state"),
|
|
"head": {
|
|
"ref": pr.get("head", {}).get("ref"),
|
|
"sha": pr.get("head", {}).get("sha"),
|
|
},
|
|
"base": {
|
|
"ref": pr.get("base", {}).get("ref"),
|
|
"sha": pr.get("base", {}).get("sha"),
|
|
},
|
|
}
|
|
|
|
elif event_type == "issues":
|
|
issue = event_data.get("issue", {})
|
|
minimal["issue"] = {
|
|
"number": issue.get("number"),
|
|
"title": issue.get("title", "")[:200],
|
|
"state": issue.get("state"),
|
|
"labels": [
|
|
{"name": label.get("name")} for label in issue.get("labels", [])
|
|
],
|
|
}
|
|
|
|
return minimal
|
|
|
|
|
|
def validate_repository_format(repo: str) -> tuple[str, str]:
|
|
"""Validate and parse repository string.
|
|
|
|
Args:
|
|
repo: Repository in format "owner/repo"
|
|
|
|
Returns:
|
|
Tuple of (owner, repo_name)
|
|
|
|
Raises:
|
|
ValueError: If format is invalid
|
|
"""
|
|
if not repo or not isinstance(repo, str):
|
|
raise ValueError("Repository must be a non-empty string")
|
|
|
|
parts = repo.split("/")
|
|
if len(parts) != 2:
|
|
raise ValueError(f"Invalid repository format: '{repo}'. Expected 'owner/repo'")
|
|
|
|
owner, repo_name = parts
|
|
|
|
# Validate owner and repo name (basic alphanumeric + dash/underscore)
|
|
if not owner or not repo_name:
|
|
raise ValueError("Owner and repository name cannot be empty")
|
|
|
|
# Check for path traversal attempts
|
|
if ".." in owner or ".." in repo_name:
|
|
raise ValueError("Path traversal detected in repository name")
|
|
|
|
# Check for shell injection attempts
|
|
dangerous_chars = [";", "|", "&", "$", "`", "(", ")", "{", "}", "[", "]", "<", ">"]
|
|
for char in dangerous_chars:
|
|
if char in owner or char in repo_name:
|
|
raise ValueError(f"Invalid character '{char}' in repository name")
|
|
|
|
return owner, repo_name
|
|
|
|
|
|
def validate_webhook_signature(payload: str, signature: str, secret: str) -> bool:
|
|
"""Validate webhook signature (for future GitHub webhook integration).
|
|
|
|
Args:
|
|
payload: Raw webhook payload
|
|
signature: Signature from webhook header
|
|
secret: Webhook secret
|
|
|
|
Returns:
|
|
True if signature is valid
|
|
"""
|
|
import hmac
|
|
import hashlib
|
|
|
|
if not secret or not signature:
|
|
return False
|
|
|
|
# GitHub uses sha256=<signature> or sha1=<signature>
|
|
if signature.startswith("sha256="):
|
|
hash_func = hashlib.sha256
|
|
signature = signature[7:]
|
|
elif signature.startswith("sha1="):
|
|
hash_func = hashlib.sha1
|
|
signature = signature[5:]
|
|
else:
|
|
return False
|
|
|
|
expected = hmac.new(secret.encode(), payload.encode(), hash_func).hexdigest()
|
|
|
|
return hmac.compare_digest(expected, signature)
|