304 lines
8.4 KiB
Python
304 lines
8.4 KiB
Python
"""Audit Logger
|
|
|
|
Enterprise audit logging for tracking all AI agent actions,
|
|
decisions, and interactions for compliance and debugging.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
class AuditLogger:
|
|
"""Audit logger for enterprise compliance."""
|
|
|
|
def __init__(
|
|
self,
|
|
log_path: str | None = None,
|
|
enabled: bool = True,
|
|
):
|
|
"""Initialize the audit logger.
|
|
|
|
Args:
|
|
log_path: Directory to write audit logs.
|
|
enabled: Whether audit logging is enabled.
|
|
"""
|
|
self.enabled = enabled
|
|
self.log_path = Path(
|
|
log_path or os.environ.get("AI_AUDIT_PATH", "/var/log/ai-review/")
|
|
)
|
|
self.logger = logging.getLogger("audit")
|
|
|
|
if self.enabled:
|
|
self._ensure_log_dir()
|
|
|
|
def _ensure_log_dir(self):
|
|
"""Ensure the log directory exists."""
|
|
try:
|
|
self.log_path.mkdir(parents=True, exist_ok=True)
|
|
except Exception as e:
|
|
self.logger.warning(f"Could not create audit log directory: {e}")
|
|
self.enabled = False
|
|
|
|
def _get_log_file(self) -> Path:
|
|
"""Get the current log file path (daily rotation)."""
|
|
date_str = datetime.utcnow().strftime("%Y-%m-%d")
|
|
return self.log_path / f"audit-{date_str}.jsonl"
|
|
|
|
def log(
|
|
self,
|
|
action: str,
|
|
agent: str,
|
|
owner: str,
|
|
repo: str,
|
|
details: dict[str, Any] | None = None,
|
|
success: bool = True,
|
|
error: str | None = None,
|
|
):
|
|
"""Log an audit event.
|
|
|
|
Args:
|
|
action: Action performed (e.g., "review_pr", "triage_issue").
|
|
agent: Agent name that performed the action.
|
|
owner: Repository owner.
|
|
repo: Repository name.
|
|
details: Additional details about the action.
|
|
success: Whether the action succeeded.
|
|
error: Error message if failed.
|
|
"""
|
|
if not self.enabled:
|
|
return
|
|
|
|
event = {
|
|
"timestamp": datetime.utcnow().isoformat() + "Z",
|
|
"action": action,
|
|
"agent": agent,
|
|
"repository": f"{owner}/{repo}",
|
|
"success": success,
|
|
"details": details or {},
|
|
}
|
|
|
|
if error:
|
|
event["error"] = error
|
|
|
|
try:
|
|
log_file = self._get_log_file()
|
|
with open(log_file, "a") as f:
|
|
f.write(json.dumps(event) + "\n")
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to write audit log: {e}")
|
|
|
|
def log_llm_call(
|
|
self,
|
|
agent: str,
|
|
owner: str,
|
|
repo: str,
|
|
provider: str,
|
|
model: str,
|
|
tokens_used: int | None = None,
|
|
duration_ms: int | None = None,
|
|
):
|
|
"""Log an LLM API call.
|
|
|
|
Args:
|
|
agent: Agent making the call.
|
|
owner: Repository owner.
|
|
repo: Repository name.
|
|
provider: LLM provider used.
|
|
model: Model name.
|
|
tokens_used: Number of tokens consumed.
|
|
duration_ms: Call duration in milliseconds.
|
|
"""
|
|
self.log(
|
|
action="llm_call",
|
|
agent=agent,
|
|
owner=owner,
|
|
repo=repo,
|
|
details={
|
|
"provider": provider,
|
|
"model": model,
|
|
"tokens_used": tokens_used,
|
|
"duration_ms": duration_ms,
|
|
},
|
|
)
|
|
|
|
def log_comment_posted(
|
|
self,
|
|
agent: str,
|
|
owner: str,
|
|
repo: str,
|
|
issue_number: int,
|
|
comment_type: str,
|
|
):
|
|
"""Log a comment being posted.
|
|
|
|
Args:
|
|
agent: Agent posting the comment.
|
|
owner: Repository owner.
|
|
repo: Repository name.
|
|
issue_number: Issue or PR number.
|
|
comment_type: Type of comment (triage, review, response).
|
|
"""
|
|
self.log(
|
|
action="comment_posted",
|
|
agent=agent,
|
|
owner=owner,
|
|
repo=repo,
|
|
details={
|
|
"issue_number": issue_number,
|
|
"comment_type": comment_type,
|
|
},
|
|
)
|
|
|
|
def log_labels_applied(
|
|
self,
|
|
agent: str,
|
|
owner: str,
|
|
repo: str,
|
|
issue_number: int,
|
|
labels: list[str],
|
|
):
|
|
"""Log labels being applied.
|
|
|
|
Args:
|
|
agent: Agent applying labels.
|
|
owner: Repository owner.
|
|
repo: Repository name.
|
|
issue_number: Issue or PR number.
|
|
labels: Labels applied.
|
|
"""
|
|
self.log(
|
|
action="labels_applied",
|
|
agent=agent,
|
|
owner=owner,
|
|
repo=repo,
|
|
details={
|
|
"issue_number": issue_number,
|
|
"labels": labels,
|
|
},
|
|
)
|
|
|
|
def get_logs(
|
|
self,
|
|
start_date: str | None = None,
|
|
end_date: str | None = None,
|
|
action: str | None = None,
|
|
repository: str | None = None,
|
|
) -> list[dict]:
|
|
"""Retrieve audit logs with optional filtering.
|
|
|
|
Args:
|
|
start_date: Start date (YYYY-MM-DD).
|
|
end_date: End date (YYYY-MM-DD).
|
|
action: Filter by action type.
|
|
repository: Filter by repository (owner/repo).
|
|
|
|
Returns:
|
|
List of audit log entries.
|
|
"""
|
|
if not self.enabled:
|
|
return []
|
|
|
|
logs = []
|
|
log_files = sorted(self.log_path.glob("audit-*.jsonl"))
|
|
|
|
for log_file in log_files:
|
|
# Date filter on filename
|
|
file_date = log_file.stem.replace("audit-", "")
|
|
if start_date and file_date < start_date:
|
|
continue
|
|
if end_date and file_date > end_date:
|
|
continue
|
|
|
|
try:
|
|
with open(log_file) as f:
|
|
for line in f:
|
|
try:
|
|
entry = json.loads(line.strip())
|
|
|
|
# Apply filters
|
|
if action and entry.get("action") != action:
|
|
continue
|
|
if repository and entry.get("repository") != repository:
|
|
continue
|
|
|
|
logs.append(entry)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
except Exception:
|
|
continue
|
|
|
|
return logs
|
|
|
|
def generate_report(
|
|
self,
|
|
start_date: str | None = None,
|
|
end_date: str | None = None,
|
|
) -> dict:
|
|
"""Generate a summary report of audit activity.
|
|
|
|
Args:
|
|
start_date: Report start date.
|
|
end_date: Report end date.
|
|
|
|
Returns:
|
|
Summary report dictionary.
|
|
"""
|
|
logs = self.get_logs(start_date=start_date, end_date=end_date)
|
|
|
|
report = {
|
|
"period": {
|
|
"start": start_date or "all",
|
|
"end": end_date or "all",
|
|
},
|
|
"total_events": len(logs),
|
|
"by_action": {},
|
|
"by_repository": {},
|
|
"by_agent": {},
|
|
"success_rate": 0.0,
|
|
"llm_usage": {
|
|
"total_calls": 0,
|
|
"total_tokens": 0,
|
|
},
|
|
}
|
|
|
|
success_count = 0
|
|
|
|
for log in logs:
|
|
action = log.get("action", "unknown")
|
|
repo = log.get("repository", "unknown")
|
|
agent = log.get("agent", "unknown")
|
|
|
|
report["by_action"][action] = report["by_action"].get(action, 0) + 1
|
|
report["by_repository"][repo] = report["by_repository"].get(repo, 0) + 1
|
|
report["by_agent"][agent] = report["by_agent"].get(agent, 0) + 1
|
|
|
|
if log.get("success"):
|
|
success_count += 1
|
|
|
|
if action == "llm_call":
|
|
report["llm_usage"]["total_calls"] += 1
|
|
tokens = log.get("details", {}).get("tokens_used")
|
|
if tokens:
|
|
report["llm_usage"]["total_tokens"] += tokens
|
|
|
|
if logs:
|
|
report["success_rate"] = success_count / len(logs)
|
|
|
|
return report
|
|
|
|
|
|
# Global instance
|
|
_audit_logger: AuditLogger | None = None
|
|
|
|
|
|
def get_audit_logger() -> AuditLogger:
|
|
"""Get the global audit logger instance."""
|
|
global _audit_logger
|
|
if _audit_logger is None:
|
|
_audit_logger = AuditLogger()
|
|
return _audit_logger
|