Files
openrabbit/tools/ai-review/utils/safe_dispatch.py
latte f94d21580c
All checks were successful
Enterprise AI Code Review / ai-review (pull_request) Successful in 26s
security fixes
2025-12-28 19:55:05 +00:00

175 lines
4.9 KiB
Python
Executable File

#!/usr/bin/env python3
"""Safe Event Dispatcher for Workflow Integration
This module provides a secure wrapper for dispatching webhook events from
CI/CD workflows. It validates inputs, sanitizes data, and prevents common
security issues.
Usage:
python safe_dispatch.py issue_comment owner/repo '{"action": "created", ...}'
Security Features:
- Input validation and sanitization
- Repository format validation
- Event data size limits
- No direct environment variable exposure
- Comprehensive error handling
"""
import json
import logging
import os
import sys
from typing import NoReturn
# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from agents.chat_agent import ChatAgent
from agents.codebase_agent import CodebaseAgent
from agents.issue_agent import IssueAgent
from agents.pr_agent import PRAgent
from dispatcher import get_dispatcher
from utils.webhook_sanitizer import (
extract_minimal_context,
sanitize_webhook_data,
validate_repository_format,
)
# Maximum event data size (10MB)
MAX_EVENT_SIZE = 10 * 1024 * 1024
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
def setup_dispatcher():
"""Initialize dispatcher with all agents."""
dispatcher = get_dispatcher()
# Register all agents
dispatcher.register_agent(PRAgent())
dispatcher.register_agent(IssueAgent())
dispatcher.register_agent(ChatAgent())
dispatcher.register_agent(CodebaseAgent())
return dispatcher
def load_event_data(event_json: str) -> dict:
"""Load and validate event data.
Args:
event_json: JSON string containing event data
Returns:
Parsed and validated event data
Raises:
ValueError: If data is invalid
"""
# Check size before parsing
if len(event_json) > MAX_EVENT_SIZE:
raise ValueError(
f"Event data too large: {len(event_json)} bytes (max: {MAX_EVENT_SIZE})"
)
try:
data = json.loads(event_json)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON: {e}") from e
if not isinstance(data, dict):
raise ValueError("Event data must be a JSON object")
return data
def safe_dispatch(event_type: str, repository: str, event_json: str) -> int:
"""Safely dispatch a webhook event.
Args:
event_type: Type of event (issue_comment, pull_request, etc.)
repository: Repository in format "owner/repo"
event_json: JSON string containing event data
Returns:
Exit code (0 for success, 1 for error)
"""
try:
# Validate repository format
owner, repo = validate_repository_format(repository)
logger.info(f"Dispatching {event_type} for {owner}/{repo}")
# Load and validate event data
event_data = load_event_data(event_json)
# Sanitize event data to remove sensitive fields
sanitized_data = sanitize_webhook_data(event_data)
# Extract minimal context (reduces attack surface)
minimal_data = extract_minimal_context(event_type, sanitized_data)
# Log sanitized version
logger.debug(f"Event data: {json.dumps(minimal_data, indent=2)[:500]}...")
# Initialize dispatcher
dispatcher = setup_dispatcher()
# Dispatch event with sanitized data
# Note: Agents will fetch full data from API if needed
result = dispatcher.dispatch(
event_type=event_type,
event_data=minimal_data,
owner=owner,
repo=repo,
)
# Log results
logger.info(f"Agents run: {result.agents_run}")
for i, agent_result in enumerate(result.results):
status = "" if agent_result.success else ""
agent_name = result.agents_run[i]
logger.info(f" {status} {agent_name}: {agent_result.message}")
# Return error code if any agents failed
if result.errors:
logger.error("Errors occurred during dispatch:")
for error in result.errors:
logger.error(f" - {error}")
return 1
return 0
except ValueError as e:
logger.error(f"Validation error: {e}")
return 1
except Exception as e:
logger.exception(f"Unexpected error during dispatch: {e}")
return 1
def main() -> NoReturn:
"""Main entry point."""
if len(sys.argv) != 4:
print("Usage: safe_dispatch.py <event_type> <owner/repo> <event_json>")
print()
print("Example:")
print(
' safe_dispatch.py issue_comment owner/repo \'{"action": "created", ...}\''
)
sys.exit(1)
event_type = sys.argv[1]
repository = sys.argv[2]
event_json = sys.argv[3]
exit_code = safe_dispatch(event_type, repository, event_json)
sys.exit(exit_code)
if __name__ == "__main__":
main()