Files
openrabbit/tools/ai-review/compliance/audit_trail.py
latte e8d28225e0
All checks were successful
AI Codebase Quality Review / ai-codebase-review (push) Successful in 39s
just why not
2026-01-07 21:19:46 +01:00

431 lines
13 KiB
Python

"""Audit Trail
Provides comprehensive audit logging for compliance requirements.
Supports HIPAA, SOC2, and other regulatory frameworks.
"""
import hashlib
import json
import logging
import os
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any
class AuditAction(Enum):
"""Types of auditable actions."""
# Review actions
REVIEW_STARTED = "review_started"
REVIEW_COMPLETED = "review_completed"
REVIEW_FAILED = "review_failed"
# Security actions
SECURITY_SCAN_STARTED = "security_scan_started"
SECURITY_SCAN_COMPLETED = "security_scan_completed"
SECURITY_FINDING_DETECTED = "security_finding_detected"
SECURITY_FINDING_RESOLVED = "security_finding_resolved"
# Comment actions
COMMENT_POSTED = "comment_posted"
COMMENT_UPDATED = "comment_updated"
COMMENT_DELETED = "comment_deleted"
# Label actions
LABEL_ADDED = "label_added"
LABEL_REMOVED = "label_removed"
# Configuration actions
CONFIG_LOADED = "config_loaded"
CONFIG_CHANGED = "config_changed"
# Access actions
API_CALL = "api_call"
AUTHENTICATION = "authentication"
# Approval actions
APPROVAL_GRANTED = "approval_granted"
APPROVAL_REVOKED = "approval_revoked"
CHANGES_REQUESTED = "changes_requested"
@dataclass
class AuditEvent:
"""An auditable event."""
action: AuditAction
timestamp: str
actor: str
resource_type: str
resource_id: str
repository: str
details: dict[str, Any] = field(default_factory=dict)
outcome: str = "success"
error: str | None = None
correlation_id: str | None = None
checksum: str | None = None
def __post_init__(self):
"""Calculate checksum for integrity verification."""
if not self.checksum:
self.checksum = self._calculate_checksum()
def _calculate_checksum(self) -> str:
"""Calculate SHA-256 checksum of event data."""
data = {
"action": self.action.value
if isinstance(self.action, AuditAction)
else self.action,
"timestamp": self.timestamp,
"actor": self.actor,
"resource_type": self.resource_type,
"resource_id": self.resource_id,
"repository": self.repository,
"details": self.details,
"outcome": self.outcome,
"error": self.error,
}
json_str = json.dumps(data, sort_keys=True)
return hashlib.sha256(json_str.encode()).hexdigest()
def to_dict(self) -> dict:
"""Convert event to dictionary."""
data = asdict(self)
if isinstance(self.action, AuditAction):
data["action"] = self.action.value
return data
def to_json(self) -> str:
"""Convert event to JSON string."""
return json.dumps(self.to_dict())
class AuditLogger:
"""Logger for audit events."""
def __init__(
self,
log_file: str | None = None,
log_to_stdout: bool = False,
log_level: str = "INFO",
):
"""Initialize audit logger.
Args:
log_file: Path to audit log file.
log_to_stdout: Also log to stdout.
log_level: Logging level.
"""
self.log_file = log_file
self.log_to_stdout = log_to_stdout
self.logger = logging.getLogger("audit")
self.logger.setLevel(getattr(logging, log_level.upper(), logging.INFO))
# Clear existing handlers
self.logger.handlers = []
# Add file handler if specified
if log_file:
log_dir = os.path.dirname(log_file)
if log_dir:
os.makedirs(log_dir, exist_ok=True)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(
logging.Formatter("%(message)s") # JSON lines format
)
self.logger.addHandler(file_handler)
# Add stdout handler if requested
if log_to_stdout:
stdout_handler = logging.StreamHandler()
stdout_handler.setFormatter(logging.Formatter("[AUDIT] %(message)s"))
self.logger.addHandler(stdout_handler)
def log(self, event: AuditEvent):
"""Log an audit event.
Args:
event: The audit event to log.
"""
self.logger.info(event.to_json())
def log_action(
self,
action: AuditAction,
actor: str,
resource_type: str,
resource_id: str,
repository: str,
details: dict | None = None,
outcome: str = "success",
error: str | None = None,
correlation_id: str | None = None,
):
"""Log an action as an audit event.
Args:
action: The action being performed.
actor: Who performed the action.
resource_type: Type of resource affected.
resource_id: ID of the resource.
repository: Repository context.
details: Additional details.
outcome: success, failure, or partial.
error: Error message if failed.
correlation_id: ID to correlate related events.
"""
event = AuditEvent(
action=action,
timestamp=datetime.now(timezone.utc).isoformat(),
actor=actor,
resource_type=resource_type,
resource_id=resource_id,
repository=repository,
details=details or {},
outcome=outcome,
error=error,
correlation_id=correlation_id,
)
self.log(event)
class AuditTrail:
"""High-level audit trail management."""
def __init__(self, config: dict):
"""Initialize audit trail.
Args:
config: Configuration dictionary.
"""
self.config = config
compliance_config = config.get("compliance", {})
audit_config = compliance_config.get("audit", {})
self.enabled = audit_config.get("enabled", False)
self.log_file = audit_config.get("log_file", "audit.log")
self.log_to_stdout = audit_config.get("log_to_stdout", False)
self.retention_days = audit_config.get("retention_days", 90)
if self.enabled:
self.logger = AuditLogger(
log_file=self.log_file,
log_to_stdout=self.log_to_stdout,
)
else:
self.logger = None
self._correlation_id = None
def set_correlation_id(self, correlation_id: str):
"""Set correlation ID for subsequent events.
Args:
correlation_id: ID to correlate related events.
"""
self._correlation_id = correlation_id
def log(
self,
action: AuditAction,
actor: str,
resource_type: str,
resource_id: str,
repository: str,
details: dict | None = None,
outcome: str = "success",
error: str | None = None,
):
"""Log an audit event.
Args:
action: The action being performed.
actor: Who performed the action.
resource_type: Type of resource (pr, issue, comment, etc).
resource_id: ID of the resource.
repository: Repository (owner/repo).
details: Additional details.
outcome: success, failure, or partial.
error: Error message if failed.
"""
if not self.enabled or not self.logger:
return
self.logger.log_action(
action=action,
actor=actor,
resource_type=resource_type,
resource_id=resource_id,
repository=repository,
details=details,
outcome=outcome,
error=error,
correlation_id=self._correlation_id,
)
def log_review_started(
self,
repository: str,
pr_number: int,
reviewer: str = "openrabbit",
):
"""Log that a review has started."""
self.log(
action=AuditAction.REVIEW_STARTED,
actor=reviewer,
resource_type="pull_request",
resource_id=str(pr_number),
repository=repository,
)
def log_review_completed(
self,
repository: str,
pr_number: int,
recommendation: str,
findings_count: int,
reviewer: str = "openrabbit",
):
"""Log that a review has completed."""
self.log(
action=AuditAction.REVIEW_COMPLETED,
actor=reviewer,
resource_type="pull_request",
resource_id=str(pr_number),
repository=repository,
details={
"recommendation": recommendation,
"findings_count": findings_count,
},
)
def log_security_finding(
self,
repository: str,
pr_number: int,
finding: dict,
scanner: str = "openrabbit",
):
"""Log a security finding."""
self.log(
action=AuditAction.SECURITY_FINDING_DETECTED,
actor=scanner,
resource_type="pull_request",
resource_id=str(pr_number),
repository=repository,
details={
"severity": finding.get("severity"),
"category": finding.get("category"),
"file": finding.get("file"),
"line": finding.get("line"),
"cwe": finding.get("cwe"),
},
)
def log_approval(
self,
repository: str,
pr_number: int,
approver: str,
approval_type: str = "ai",
):
"""Log an approval action."""
self.log(
action=AuditAction.APPROVAL_GRANTED,
actor=approver,
resource_type="pull_request",
resource_id=str(pr_number),
repository=repository,
details={"approval_type": approval_type},
)
def log_changes_requested(
self,
repository: str,
pr_number: int,
requester: str,
reason: str | None = None,
):
"""Log a changes requested action."""
self.log(
action=AuditAction.CHANGES_REQUESTED,
actor=requester,
resource_type="pull_request",
resource_id=str(pr_number),
repository=repository,
details={"reason": reason} if reason else {},
)
def generate_report(
self,
start_date: datetime | None = None,
end_date: datetime | None = None,
repository: str | None = None,
) -> dict:
"""Generate an audit report.
Args:
start_date: Start of reporting period.
end_date: End of reporting period.
repository: Filter by repository.
Returns:
Report dictionary with statistics and events.
"""
if not self.log_file or not os.path.exists(self.log_file):
return {"events": [], "statistics": {}}
events = []
with open(self.log_file) as f:
for line in f:
try:
event = json.loads(line.strip())
event_time = datetime.fromisoformat(
event["timestamp"].replace("Z", "+00:00")
)
# Apply filters
if start_date and event_time < start_date:
continue
if end_date and event_time > end_date:
continue
if repository and event.get("repository") != repository:
continue
events.append(event)
except (json.JSONDecodeError, KeyError):
continue
# Calculate statistics
action_counts = {}
outcome_counts = {"success": 0, "failure": 0, "partial": 0}
security_findings = 0
for event in events:
action = event.get("action", "unknown")
action_counts[action] = action_counts.get(action, 0) + 1
outcome = event.get("outcome", "success")
if outcome in outcome_counts:
outcome_counts[outcome] += 1
if action == "security_finding_detected":
security_findings += 1
return {
"events": events,
"statistics": {
"total_events": len(events),
"action_counts": action_counts,
"outcome_counts": outcome_counts,
"security_findings": security_findings,
},
"period": {
"start": start_date.isoformat() if start_date else None,
"end": end_date.isoformat() if end_date else None,
},
}