Files
latte e8d28225e0
All checks were successful
AI Codebase Quality Review / ai-codebase-review (push) Successful in 39s
just why not
2026-01-07 21:19:46 +01:00

543 lines
16 KiB
Python

"""Notification System
Provides webhook-based notifications for Slack, Discord, and other platforms.
Supports critical security findings, review summaries, and custom alerts.
"""
import logging
import os
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Any
import requests
class NotificationLevel(Enum):
"""Notification severity levels."""
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
@dataclass
class NotificationMessage:
"""A notification message."""
title: str
message: str
level: NotificationLevel = NotificationLevel.INFO
fields: dict[str, str] | None = None
url: str | None = None
footer: str | None = None
class Notifier(ABC):
"""Abstract base class for notification providers."""
@abstractmethod
def send(self, message: NotificationMessage) -> bool:
"""Send a notification.
Args:
message: The notification message to send.
Returns:
True if sent successfully, False otherwise.
"""
pass
@abstractmethod
def send_raw(self, payload: dict) -> bool:
"""Send a raw payload to the webhook.
Args:
payload: Raw payload in provider-specific format.
Returns:
True if sent successfully, False otherwise.
"""
pass
class SlackNotifier(Notifier):
"""Slack webhook notifier."""
# Color mapping for different levels
LEVEL_COLORS = {
NotificationLevel.INFO: "#36a64f", # Green
NotificationLevel.WARNING: "#ffcc00", # Yellow
NotificationLevel.ERROR: "#ff6600", # Orange
NotificationLevel.CRITICAL: "#cc0000", # Red
}
LEVEL_EMOJIS = {
NotificationLevel.INFO: ":information_source:",
NotificationLevel.WARNING: ":warning:",
NotificationLevel.ERROR: ":x:",
NotificationLevel.CRITICAL: ":rotating_light:",
}
def __init__(
self,
webhook_url: str | None = None,
channel: str | None = None,
username: str = "OpenRabbit",
icon_emoji: str = ":robot_face:",
timeout: int = 10,
):
"""Initialize Slack notifier.
Args:
webhook_url: Slack incoming webhook URL.
Defaults to SLACK_WEBHOOK_URL env var.
channel: Override channel (optional).
username: Bot username to display.
icon_emoji: Bot icon emoji.
timeout: Request timeout in seconds.
"""
self.webhook_url = webhook_url or os.environ.get("SLACK_WEBHOOK_URL", "")
self.channel = channel
self.username = username
self.icon_emoji = icon_emoji
self.timeout = timeout
self.logger = logging.getLogger(__name__)
def send(self, message: NotificationMessage) -> bool:
"""Send a notification to Slack."""
if not self.webhook_url:
self.logger.warning("Slack webhook URL not configured")
return False
# Build attachment
attachment = {
"color": self.LEVEL_COLORS.get(message.level, "#36a64f"),
"title": f"{self.LEVEL_EMOJIS.get(message.level, '')} {message.title}",
"text": message.message,
"mrkdwn_in": ["text", "fields"],
}
if message.url:
attachment["title_link"] = message.url
if message.fields:
attachment["fields"] = [
{"title": k, "value": v, "short": len(v) < 40}
for k, v in message.fields.items()
]
if message.footer:
attachment["footer"] = message.footer
attachment["footer_icon"] = "https://github.com/favicon.ico"
payload = {
"username": self.username,
"icon_emoji": self.icon_emoji,
"attachments": [attachment],
}
if self.channel:
payload["channel"] = self.channel
return self.send_raw(payload)
def send_raw(self, payload: dict) -> bool:
"""Send raw payload to Slack webhook."""
if not self.webhook_url:
return False
try:
response = requests.post(
self.webhook_url,
json=payload,
timeout=self.timeout,
)
response.raise_for_status()
return True
except requests.RequestException as e:
self.logger.error(f"Failed to send Slack notification: {e}")
return False
class DiscordNotifier(Notifier):
"""Discord webhook notifier."""
# Color mapping for different levels (Discord uses decimal colors)
LEVEL_COLORS = {
NotificationLevel.INFO: 3066993, # Green
NotificationLevel.WARNING: 16776960, # Yellow
NotificationLevel.ERROR: 16744448, # Orange
NotificationLevel.CRITICAL: 13369344, # Red
}
def __init__(
self,
webhook_url: str | None = None,
username: str = "OpenRabbit",
avatar_url: str | None = None,
timeout: int = 10,
):
"""Initialize Discord notifier.
Args:
webhook_url: Discord webhook URL.
Defaults to DISCORD_WEBHOOK_URL env var.
username: Bot username to display.
avatar_url: Bot avatar URL.
timeout: Request timeout in seconds.
"""
self.webhook_url = webhook_url or os.environ.get("DISCORD_WEBHOOK_URL", "")
self.username = username
self.avatar_url = avatar_url
self.timeout = timeout
self.logger = logging.getLogger(__name__)
def send(self, message: NotificationMessage) -> bool:
"""Send a notification to Discord."""
if not self.webhook_url:
self.logger.warning("Discord webhook URL not configured")
return False
# Build embed
embed = {
"title": message.title,
"description": message.message,
"color": self.LEVEL_COLORS.get(message.level, 3066993),
}
if message.url:
embed["url"] = message.url
if message.fields:
embed["fields"] = [
{"name": k, "value": v, "inline": len(v) < 40}
for k, v in message.fields.items()
]
if message.footer:
embed["footer"] = {"text": message.footer}
payload = {
"username": self.username,
"embeds": [embed],
}
if self.avatar_url:
payload["avatar_url"] = self.avatar_url
return self.send_raw(payload)
def send_raw(self, payload: dict) -> bool:
"""Send raw payload to Discord webhook."""
if not self.webhook_url:
return False
try:
response = requests.post(
self.webhook_url,
json=payload,
timeout=self.timeout,
)
response.raise_for_status()
return True
except requests.RequestException as e:
self.logger.error(f"Failed to send Discord notification: {e}")
return False
class WebhookNotifier(Notifier):
"""Generic webhook notifier for custom integrations."""
def __init__(
self,
webhook_url: str,
headers: dict[str, str] | None = None,
timeout: int = 10,
):
"""Initialize generic webhook notifier.
Args:
webhook_url: Webhook URL.
headers: Custom headers to include.
timeout: Request timeout in seconds.
"""
self.webhook_url = webhook_url
self.headers = headers or {"Content-Type": "application/json"}
self.timeout = timeout
self.logger = logging.getLogger(__name__)
def send(self, message: NotificationMessage) -> bool:
"""Send a notification as JSON payload."""
payload = {
"title": message.title,
"message": message.message,
"level": message.level.value,
"fields": message.fields or {},
"url": message.url,
"footer": message.footer,
}
return self.send_raw(payload)
def send_raw(self, payload: dict) -> bool:
"""Send raw payload to webhook."""
try:
response = requests.post(
self.webhook_url,
json=payload,
headers=self.headers,
timeout=self.timeout,
)
response.raise_for_status()
return True
except requests.RequestException as e:
self.logger.error(f"Failed to send webhook notification: {e}")
return False
class NotifierFactory:
"""Factory for creating notifier instances from config."""
@staticmethod
def create_from_config(config: dict) -> list[Notifier]:
"""Create notifier instances from configuration.
Args:
config: Configuration dictionary with 'notifications' section.
Returns:
List of configured notifier instances.
"""
notifiers = []
notifications_config = config.get("notifications", {})
if not notifications_config.get("enabled", False):
return notifiers
# Slack
slack_config = notifications_config.get("slack", {})
if slack_config.get("enabled", False):
webhook_url = slack_config.get("webhook_url") or os.environ.get(
"SLACK_WEBHOOK_URL"
)
if webhook_url:
notifiers.append(
SlackNotifier(
webhook_url=webhook_url,
channel=slack_config.get("channel"),
username=slack_config.get("username", "OpenRabbit"),
)
)
# Discord
discord_config = notifications_config.get("discord", {})
if discord_config.get("enabled", False):
webhook_url = discord_config.get("webhook_url") or os.environ.get(
"DISCORD_WEBHOOK_URL"
)
if webhook_url:
notifiers.append(
DiscordNotifier(
webhook_url=webhook_url,
username=discord_config.get("username", "OpenRabbit"),
avatar_url=discord_config.get("avatar_url"),
)
)
# Generic webhooks
webhooks_config = notifications_config.get("webhooks", [])
for webhook in webhooks_config:
if webhook.get("enabled", True) and webhook.get("url"):
notifiers.append(
WebhookNotifier(
webhook_url=webhook["url"],
headers=webhook.get("headers"),
)
)
return notifiers
@staticmethod
def should_notify(
level: NotificationLevel,
config: dict,
) -> bool:
"""Check if notification should be sent based on level threshold.
Args:
level: Notification level.
config: Configuration dictionary.
Returns:
True if notification should be sent.
"""
notifications_config = config.get("notifications", {})
if not notifications_config.get("enabled", False):
return False
threshold = notifications_config.get("threshold", "warning")
level_order = ["info", "warning", "error", "critical"]
try:
threshold_idx = level_order.index(threshold)
level_idx = level_order.index(level.value)
return level_idx >= threshold_idx
except ValueError:
return True
class NotificationService:
"""High-level notification service for sending alerts."""
def __init__(self, config: dict):
"""Initialize notification service.
Args:
config: Configuration dictionary.
"""
self.config = config
self.notifiers = NotifierFactory.create_from_config(config)
self.logger = logging.getLogger(__name__)
def notify(self, message: NotificationMessage) -> bool:
"""Send notification to all configured notifiers.
Args:
message: Notification message.
Returns:
True if at least one notification succeeded.
"""
if not NotifierFactory.should_notify(message.level, self.config):
return True # Not an error, just below threshold
if not self.notifiers:
self.logger.debug("No notifiers configured")
return True
success = False
for notifier in self.notifiers:
try:
if notifier.send(message):
success = True
except Exception as e:
self.logger.error(f"Notifier failed: {e}")
return success
def notify_security_finding(
self,
repo: str,
pr_number: int,
finding: dict,
pr_url: str | None = None,
) -> bool:
"""Send notification for a security finding.
Args:
repo: Repository name (owner/repo).
pr_number: Pull request number.
finding: Security finding dict with severity, description, etc.
pr_url: URL to the pull request.
Returns:
True if notification succeeded.
"""
severity = finding.get("severity", "MEDIUM").upper()
level_map = {
"HIGH": NotificationLevel.CRITICAL,
"MEDIUM": NotificationLevel.WARNING,
"LOW": NotificationLevel.INFO,
}
level = level_map.get(severity, NotificationLevel.WARNING)
message = NotificationMessage(
title=f"Security Finding in {repo} PR #{pr_number}",
message=finding.get("description", "Security issue detected"),
level=level,
fields={
"Severity": severity,
"Category": finding.get("category", "Unknown"),
"File": finding.get("file", "N/A"),
"Line": str(finding.get("line", "N/A")),
},
url=pr_url,
footer=f"CWE: {finding.get('cwe', 'N/A')}",
)
return self.notify(message)
def notify_review_complete(
self,
repo: str,
pr_number: int,
summary: dict,
pr_url: str | None = None,
) -> bool:
"""Send notification for completed review.
Args:
repo: Repository name (owner/repo).
pr_number: Pull request number.
summary: Review summary dict.
pr_url: URL to the pull request.
Returns:
True if notification succeeded.
"""
recommendation = summary.get("recommendation", "COMMENT")
level_map = {
"APPROVE": NotificationLevel.INFO,
"COMMENT": NotificationLevel.INFO,
"REQUEST_CHANGES": NotificationLevel.WARNING,
}
level = level_map.get(recommendation, NotificationLevel.INFO)
high_issues = summary.get("high_severity_count", 0)
if high_issues > 0:
level = NotificationLevel.ERROR
message = NotificationMessage(
title=f"Review Complete: {repo} PR #{pr_number}",
message=summary.get("summary", "AI review completed"),
level=level,
fields={
"Recommendation": recommendation,
"High Severity": str(high_issues),
"Medium Severity": str(summary.get("medium_severity_count", 0)),
"Files Reviewed": str(summary.get("files_reviewed", 0)),
},
url=pr_url,
footer="OpenRabbit AI Review",
)
return self.notify(message)
def notify_error(
self,
repo: str,
error: str,
context: str | None = None,
) -> bool:
"""Send notification for an error.
Args:
repo: Repository name.
error: Error message.
context: Additional context.
Returns:
True if notification succeeded.
"""
message = NotificationMessage(
title=f"Error in {repo}",
message=error,
level=NotificationLevel.ERROR,
fields={"Context": context} if context else None,
footer="OpenRabbit Error Alert",
)
return self.notify(message)