This commit is contained in:
2026-01-29 19:53:36 +01:00
parent 1bda2013bb
commit a9708b33e2
27 changed files with 3745 additions and 4 deletions

View File

@@ -0,0 +1,171 @@
"""Audit logging system for MCP tool invocations."""
import json
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Optional
import structlog
from aegis_gitea_mcp.config import get_settings
class AuditLogger:
"""Audit logger for tracking all MCP tool invocations."""
def __init__(self, log_path: Optional[Path] = None) -> None:
"""Initialize audit logger.
Args:
log_path: Path to audit log file (defaults to config value)
"""
self.settings = get_settings()
self.log_path = log_path or self.settings.audit_log_path
# Ensure log directory exists
self.log_path.parent.mkdir(parents=True, exist_ok=True)
# Configure structlog for audit logging
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.processors.dict_tracebacks,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(
logging_level=self.settings.log_level
),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(file=self._get_log_file()),
cache_logger_on_first_use=True,
)
self.logger = structlog.get_logger("audit")
def _get_log_file(self) -> Any:
"""Get file handle for audit log."""
return open(self.log_path, "a", encoding="utf-8")
def log_tool_invocation(
self,
tool_name: str,
repository: Optional[str] = None,
target: Optional[str] = None,
params: Optional[Dict[str, Any]] = None,
correlation_id: Optional[str] = None,
result_status: str = "pending",
error: Optional[str] = None,
) -> str:
"""Log an MCP tool invocation.
Args:
tool_name: Name of the MCP tool being invoked
repository: Repository identifier (owner/repo)
target: Target path, commit hash, issue number, etc.
params: Additional parameters passed to the tool
correlation_id: Request correlation ID (auto-generated if not provided)
result_status: Status of the invocation (pending, success, error)
error: Error message if invocation failed
Returns:
Correlation ID for this invocation
"""
if correlation_id is None:
correlation_id = str(uuid.uuid4())
audit_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"correlation_id": correlation_id,
"tool_name": tool_name,
"repository": repository,
"target": target,
"params": params or {},
"result_status": result_status,
}
if error:
audit_entry["error"] = error
self.logger.info("tool_invocation", **audit_entry)
return correlation_id
def log_access_denied(
self,
tool_name: str,
repository: Optional[str] = None,
reason: str = "unauthorized",
correlation_id: Optional[str] = None,
) -> str:
"""Log an access denial event.
Args:
tool_name: Name of the tool that was denied access
repository: Repository identifier that access was denied to
reason: Reason for denial
correlation_id: Request correlation ID
Returns:
Correlation ID for this event
"""
if correlation_id is None:
correlation_id = str(uuid.uuid4())
self.logger.warning(
"access_denied",
timestamp=datetime.now(timezone.utc).isoformat(),
correlation_id=correlation_id,
tool_name=tool_name,
repository=repository,
reason=reason,
)
return correlation_id
def log_security_event(
self,
event_type: str,
description: str,
severity: str = "medium",
metadata: Optional[Dict[str, Any]] = None,
) -> str:
"""Log a security-related event.
Args:
event_type: Type of security event (e.g., rate_limit, invalid_input)
description: Human-readable description of the event
severity: Severity level (low, medium, high, critical)
metadata: Additional metadata about the event
Returns:
Correlation ID for this event
"""
correlation_id = str(uuid.uuid4())
self.logger.warning(
"security_event",
timestamp=datetime.now(timezone.utc).isoformat(),
correlation_id=correlation_id,
event_type=event_type,
description=description,
severity=severity,
metadata=metadata or {},
)
return correlation_id
# Global audit logger instance
_audit_logger: Optional[AuditLogger] = None
def get_audit_logger() -> AuditLogger:
"""Get or create global audit logger instance."""
global _audit_logger
if _audit_logger is None:
_audit_logger = AuditLogger()
return _audit_logger
def reset_audit_logger() -> None:
"""Reset global audit logger instance (primarily for testing)."""
global _audit_logger
_audit_logger = None