Files
openrabbit/tools/ai-review/dispatcher.py
2025-12-21 13:42:30 +01:00

212 lines
5.8 KiB
Python

"""Event Dispatcher
Routes incoming webhook events to the appropriate agent handlers.
Supports concurrent execution and queue management.
"""
import logging
import os
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from typing import Type
import yaml
from agents.base_agent import AgentContext, AgentResult, BaseAgent
@dataclass
class DispatchResult:
"""Result of dispatching an event."""
event_type: str
agents_run: list[str]
results: list[AgentResult]
errors: list[str]
class Dispatcher:
"""Event dispatcher that routes events to appropriate agents."""
def __init__(
self,
config: dict | None = None,
max_workers: int = 4,
):
"""Initialize the dispatcher.
Args:
config: Configuration dictionary.
max_workers: Maximum concurrent agent executions.
"""
self.config = config or self._load_config()
self.max_workers = max_workers
self.logger = logging.getLogger(__name__)
self._agents: list[BaseAgent] = []
self._executor = ThreadPoolExecutor(max_workers=max_workers)
@staticmethod
def _load_config() -> dict:
"""Load configuration from config.yml."""
config_path = os.path.join(os.path.dirname(__file__), "config.yml")
if os.path.exists(config_path):
with open(config_path) as f:
return yaml.safe_load(f)
return {}
def register_agent(self, agent: BaseAgent):
"""Register an agent with the dispatcher.
Args:
agent: Agent instance to register.
"""
self._agents.append(agent)
self.logger.info(f"Registered agent: {agent.__class__.__name__}")
def register_agent_class(self, agent_class: Type[BaseAgent], **kwargs):
"""Register an agent class (will be instantiated).
Args:
agent_class: Agent class to instantiate and register.
**kwargs: Arguments to pass to agent constructor.
"""
agent = agent_class(config=self.config, **kwargs)
self.register_agent(agent)
def dispatch(
self,
event_type: str,
event_data: dict,
owner: str,
repo: str,
) -> DispatchResult:
"""Dispatch an event to registered agents.
Args:
event_type: Type of event (issue, pull_request, issue_comment, etc).
event_data: Event payload data.
owner: Repository owner.
repo: Repository name.
Returns:
Dispatch result with all agent results.
"""
self.logger.info(f"Dispatching event: {event_type} for {owner}/{repo}")
# Find agents that can handle this event
handlers = [
agent for agent in self._agents if agent.can_handle(event_type, event_data)
]
if not handlers:
self.logger.info(f"No agents registered for event: {event_type}")
return DispatchResult(
event_type=event_type,
agents_run=[],
results=[],
errors=[],
)
self.logger.info(
f"Found {len(handlers)} agent(s) for event: {[a.__class__.__name__ for a in handlers]}"
)
# Create context for agents
context = AgentContext(
owner=owner,
repo=repo,
event_type=event_type,
event_data=event_data,
config=self.config,
)
# Run all handlers
results = []
errors = []
agents_run = []
for agent in handlers:
agent_name = agent.__class__.__name__
agents_run.append(agent_name)
try:
result = agent.run(context)
results.append(result)
if not result.success:
errors.append(f"{agent_name}: {result.error or result.message}")
except Exception as e:
self.logger.exception(f"Agent {agent_name} failed: {e}")
errors.append(f"{agent_name}: {str(e)}")
results.append(
AgentResult(
success=False,
message="Unexpected error",
error=str(e),
)
)
return DispatchResult(
event_type=event_type,
agents_run=agents_run,
results=results,
errors=errors,
)
def dispatch_async(
self,
event_type: str,
event_data: dict,
owner: str,
repo: str,
):
"""Dispatch an event asynchronously.
Args:
event_type: Type of event.
event_data: Event payload data.
owner: Repository owner.
repo: Repository name.
Returns:
Future that resolves to DispatchResult.
"""
return self._executor.submit(
self.dispatch, event_type, event_data, owner, repo
)
def shutdown(self):
"""Shutdown the executor."""
self._executor.shutdown(wait=True)
# Singleton dispatcher for easy access
_dispatcher: Dispatcher | None = None
def get_dispatcher() -> Dispatcher:
"""Get the global dispatcher instance."""
global _dispatcher
if _dispatcher is None:
_dispatcher = Dispatcher()
return _dispatcher
def dispatch_event(
event_type: str,
event_data: dict,
owner: str,
repo: str,
) -> DispatchResult:
"""Dispatch an event using the global dispatcher.
Args:
event_type: Type of event.
event_data: Event payload data.
owner: Repository owner.
repo: Repository name.
Returns:
Dispatch result.
"""
return get_dispatcher().dispatch(event_type, event_data, owner, repo)