"""Webhook Data Sanitization Utilities This module provides utilities to sanitize webhook event data before passing it to agents or storing it in environment variables. This helps prevent sensitive information exposure in logs and environment dumps. Security Features: - Removes sensitive fields from webhook payloads - Validates input structure - Provides logging-safe versions of data """ import copy import logging from typing import Any logger = logging.getLogger(__name__) # Fields that should be removed from webhook data when stored in environment SENSITIVE_FIELDS = { # User data "email", "private_email", "email_addresses", # Authentication & tokens "token", "access_token", "refresh_token", "api_key", "secret", "password", "private_key", "ssh_key", # Personal info "phone", "phone_number", "address", "ssn", "credit_card", # Internal identifiers that might be sensitive "installation_id", "node_id", } # Fields to keep only minimal info (redact most content) REDACT_FIELDS = { "body": 500, # Keep first 500 chars only "description": 500, "message": 500, } def sanitize_webhook_data(data: dict, max_depth: int = 10) -> dict: """Sanitize webhook data by removing sensitive fields. This function removes sensitive fields and truncates large text fields to prevent accidental exposure in logs or environment variables. Args: data: Webhook event data to sanitize max_depth: Maximum recursion depth (prevents infinite loops) Returns: Sanitized copy of the data Example: >>> event = {"issue": {"body": "..." * 1000, "user": {"email": "secret@example.com"}}} >>> clean = sanitize_webhook_data(event) >>> "email" in str(clean) False """ if max_depth <= 0: logger.warning("Max recursion depth reached during sanitization") return {} if not isinstance(data, dict): return data sanitized = {} for key, value in data.items(): # Skip sensitive fields entirely if key.lower() in SENSITIVE_FIELDS: sanitized[key] = "[REDACTED]" continue # Truncate large text fields if key in REDACT_FIELDS and isinstance(value, str): max_len = REDACT_FIELDS[key] if len(value) > max_len: sanitized[key] = value[:max_len] + "... [TRUNCATED]" else: sanitized[key] = value continue # Recursively sanitize nested dicts if isinstance(value, dict): sanitized[key] = sanitize_webhook_data(value, max_depth - 1) elif isinstance(value, list): sanitized[key] = [ sanitize_webhook_data(item, max_depth - 1) if isinstance(item, dict) else item for item in value ] else: sanitized[key] = value return sanitized def extract_minimal_context(event_type: str, event_data: dict) -> dict: """Extract only the minimal necessary data for workflow dispatch. This creates a minimal payload with just the essential fields needed for agent dispatch, reducing the attack surface. Args: event_type: Type of webhook event event_data: Full webhook payload Returns: Minimal safe payload """ minimal = { "action": event_data.get("action"), } if event_type == "issue_comment": issue = event_data.get("issue", {}) comment = event_data.get("comment", {}) minimal["issue"] = { "number": issue.get("number"), "title": issue.get("title", "")[:200], # Truncate title "state": issue.get("state"), "pull_request": issue.get( "pull_request" ), # Just the reference, not full data "labels": [ {"name": label.get("name")} for label in issue.get("labels", []) ], } minimal["comment"] = { "id": comment.get("id"), "body": comment.get("body", "")[:2000], # Truncate to 2KB "user": { "login": comment.get("user", {}).get("login"), }, } elif event_type == "pull_request": pr = event_data.get("pull_request", {}) minimal["pull_request"] = { "number": pr.get("number"), "title": pr.get("title", "")[:200], "state": pr.get("state"), "head": { "ref": pr.get("head", {}).get("ref"), "sha": pr.get("head", {}).get("sha"), }, "base": { "ref": pr.get("base", {}).get("ref"), "sha": pr.get("base", {}).get("sha"), }, } elif event_type == "issues": issue = event_data.get("issue", {}) minimal["issue"] = { "number": issue.get("number"), "title": issue.get("title", "")[:200], "state": issue.get("state"), "labels": [ {"name": label.get("name")} for label in issue.get("labels", []) ], } return minimal def validate_repository_format(repo: str) -> tuple[str, str]: """Validate and parse repository string. Args: repo: Repository in format "owner/repo" Returns: Tuple of (owner, repo_name) Raises: ValueError: If format is invalid """ if not repo or not isinstance(repo, str): raise ValueError("Repository must be a non-empty string") parts = repo.split("/") if len(parts) != 2: raise ValueError(f"Invalid repository format: '{repo}'. Expected 'owner/repo'") owner, repo_name = parts # Validate owner and repo name (basic alphanumeric + dash/underscore) if not owner or not repo_name: raise ValueError("Owner and repository name cannot be empty") # Check for path traversal attempts if ".." in owner or ".." in repo_name: raise ValueError("Path traversal detected in repository name") # Check for shell injection attempts dangerous_chars = [";", "|", "&", "$", "`", "(", ")", "{", "}", "[", "]", "<", ">"] for char in dangerous_chars: if char in owner or char in repo_name: raise ValueError(f"Invalid character '{char}' in repository name") return owner, repo_name def validate_webhook_signature(payload: str, signature: str, secret: str) -> bool: """Validate webhook signature (for future GitHub webhook integration). Args: payload: Raw webhook payload signature: Signature from webhook header secret: Webhook secret Returns: True if signature is valid """ import hmac import hashlib if not secret or not signature: return False # GitHub uses sha256= or sha1= if signature.startswith("sha256="): hash_func = hashlib.sha256 signature = signature[7:] elif signature.startswith("sha1="): hash_func = hashlib.sha1 signature = signature[5:] else: return False expected = hmac.new(secret.encode(), payload.encode(), hash_func).hexdigest() return hmac.compare_digest(expected, signature)