"""Audit logging system for MCP tool invocations.""" import json import uuid from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Optional import structlog from aegis_gitea_mcp.config import get_settings class AuditLogger: """Audit logger for tracking all MCP tool invocations.""" def __init__(self, log_path: Optional[Path] = None) -> None: """Initialize audit logger. Args: log_path: Path to audit log file (defaults to config value) """ self.settings = get_settings() self.log_path = log_path or self.settings.audit_log_path # Ensure log directory exists self.log_path.parent.mkdir(parents=True, exist_ok=True) # Configure structlog for audit logging structlog.configure( processors=[ structlog.processors.TimeStamper(fmt="iso", utc=True), structlog.processors.dict_tracebacks, structlog.processors.JSONRenderer(), ], wrapper_class=structlog.BoundLogger, context_class=dict, logger_factory=structlog.PrintLoggerFactory(file=self._get_log_file()), cache_logger_on_first_use=True, ) self.logger = structlog.get_logger("audit") def _get_log_file(self) -> Any: """Get file handle for audit log.""" return open(self.log_path, "a", encoding="utf-8") def log_tool_invocation( self, tool_name: str, repository: Optional[str] = None, target: Optional[str] = None, params: Optional[Dict[str, Any]] = None, correlation_id: Optional[str] = None, result_status: str = "pending", error: Optional[str] = None, ) -> str: """Log an MCP tool invocation. Args: tool_name: Name of the MCP tool being invoked repository: Repository identifier (owner/repo) target: Target path, commit hash, issue number, etc. params: Additional parameters passed to the tool correlation_id: Request correlation ID (auto-generated if not provided) result_status: Status of the invocation (pending, success, error) error: Error message if invocation failed Returns: Correlation ID for this invocation """ if correlation_id is None: correlation_id = str(uuid.uuid4()) audit_entry = { "timestamp": datetime.now(timezone.utc).isoformat(), "correlation_id": correlation_id, "tool_name": tool_name, "repository": repository, "target": target, "params": params or {}, "result_status": result_status, } if error: audit_entry["error"] = error self.logger.info("tool_invocation", **audit_entry) return correlation_id def log_access_denied( self, tool_name: str, repository: Optional[str] = None, reason: str = "unauthorized", correlation_id: Optional[str] = None, ) -> str: """Log an access denial event. Args: tool_name: Name of the tool that was denied access repository: Repository identifier that access was denied to reason: Reason for denial correlation_id: Request correlation ID Returns: Correlation ID for this event """ if correlation_id is None: correlation_id = str(uuid.uuid4()) self.logger.warning( "access_denied", timestamp=datetime.now(timezone.utc).isoformat(), correlation_id=correlation_id, tool_name=tool_name, repository=repository, reason=reason, ) return correlation_id def log_security_event( self, event_type: str, description: str, severity: str = "medium", metadata: Optional[Dict[str, Any]] = None, ) -> str: """Log a security-related event. Args: event_type: Type of security event (e.g., rate_limit, invalid_input) description: Human-readable description of the event severity: Severity level (low, medium, high, critical) metadata: Additional metadata about the event Returns: Correlation ID for this event """ correlation_id = str(uuid.uuid4()) self.logger.warning( "security_event", timestamp=datetime.now(timezone.utc).isoformat(), correlation_id=correlation_id, event_type=event_type, description=description, severity=severity, metadata=metadata or {}, ) return correlation_id # Global audit logger instance _audit_logger: Optional[AuditLogger] = None def get_audit_logger() -> AuditLogger: """Get or create global audit logger instance.""" global _audit_logger if _audit_logger is None: _audit_logger = AuditLogger() return _audit_logger def reset_audit_logger() -> None: """Reset global audit logger instance (primarily for testing).""" global _audit_logger _audit_logger = None