feat: add API key authentication system for ChatGPT Business

Implements comprehensive Bearer token authentication to ensure only
authorized ChatGPT workspaces can access the MCP server.

Core Features:
- API key validation with constant-time comparison
- Multi-key support for rotation grace periods
- Rate limiting (5 failures per IP per 5 min)
- Comprehensive audit logging of all auth attempts
- IP-based failed attempt tracking

Key Management:
- generate_api_key.py: Create secure 64-char keys
- rotate_api_key.py: Guided key rotation with backup
- check_key_age.py: Automated expiration monitoring

Infrastructure:
- Traefik labels for HTTPS and rate limiting
- Security headers (HSTS, CSP, X-Frame-Options)
- Environment-based configuration
- Docker secrets support

Documentation:
- AUTH_SETUP.md: Complete authentication setup guide
- CHATGPT_SETUP.md: ChatGPT Business integration guide
- KEY_ROTATION.md: Key rotation procedures and automation

Security:
- Read-only operations enforced
- No write access to Gitea possible
- All auth attempts logged with correlation IDs
- Failed attempts trigger IP rate limits
- Keys never logged in full (only hints)

Breaking Changes:
- AUTH_ENABLED defaults to true
- MCP_API_KEYS environment variable now required
- Minimum key length: 32 characters (64 recommended)

Migration:
1. Generate API key: make generate-key
2. Add to .env: MCP_API_KEYS=<generated-key>
3. Restart: docker-compose restart aegis-mcp
4. Configure ChatGPT with Authorization header

Closes requirements for ChatGPT Business exclusive access.
This commit is contained in:
2026-01-29 20:05:49 +01:00
parent a9708b33e2
commit eeaad748a6
13 changed files with 2263 additions and 4 deletions

243
src/aegis_gitea_mcp/auth.py Normal file
View File

@@ -0,0 +1,243 @@
"""Authentication module for MCP server API key validation."""
import hashlib
import hmac
import secrets
from datetime import datetime, timezone
from typing import Optional, Tuple
from aegis_gitea_mcp.audit import get_audit_logger
from aegis_gitea_mcp.config import get_settings
class AuthenticationError(Exception):
"""Raised when authentication fails."""
pass
class APIKeyValidator:
"""Validates API keys for MCP server access."""
def __init__(self) -> None:
"""Initialize API key validator."""
self.settings = get_settings()
self.audit = get_audit_logger()
self._failed_attempts: dict[str, list[datetime]] = {}
def _constant_time_compare(self, a: str, b: str) -> bool:
"""Compare two strings in constant time to prevent timing attacks.
Args:
a: First string
b: Second string
Returns:
True if strings are equal, False otherwise
"""
return hmac.compare_digest(a, b)
def _check_rate_limit(self, identifier: str) -> bool:
"""Check if identifier has exceeded failed authentication rate limit.
Args:
identifier: IP address or other identifier
Returns:
True if within rate limit, False if exceeded
"""
now = datetime.now(timezone.utc)
window_start = now.timestamp() - self.settings.auth_failure_window
# Clean up old attempts
if identifier in self._failed_attempts:
self._failed_attempts[identifier] = [
attempt
for attempt in self._failed_attempts[identifier]
if attempt.timestamp() > window_start
]
# Check count
attempt_count = len(self._failed_attempts.get(identifier, []))
return attempt_count < self.settings.max_auth_failures
def _record_failed_attempt(self, identifier: str) -> None:
"""Record a failed authentication attempt.
Args:
identifier: IP address or other identifier
"""
now = datetime.now(timezone.utc)
if identifier not in self._failed_attempts:
self._failed_attempts[identifier] = []
self._failed_attempts[identifier].append(now)
# Check if threshold exceeded
if len(self._failed_attempts[identifier]) >= self.settings.max_auth_failures:
self.audit.log_security_event(
event_type="auth_rate_limit_exceeded",
description=f"IP {identifier} exceeded auth failure threshold",
severity="high",
metadata={
"identifier": identifier,
"failure_count": len(self._failed_attempts[identifier]),
"window_seconds": self.settings.auth_failure_window,
},
)
def validate_api_key(
self, provided_key: Optional[str], client_ip: str, user_agent: str
) -> Tuple[bool, Optional[str]]:
"""Validate an API key.
Args:
provided_key: API key provided by client
client_ip: Client IP address
user_agent: Client user agent string
Returns:
Tuple of (is_valid, error_message)
"""
# Check if authentication is enabled
if not self.settings.auth_enabled:
self.audit.log_security_event(
event_type="auth_disabled",
description="Authentication is disabled - allowing all requests",
severity="critical",
metadata={"client_ip": client_ip},
)
return True, None
# Check rate limit
if not self._check_rate_limit(client_ip):
self.audit.log_access_denied(
tool_name="api_authentication",
reason="rate_limit_exceeded",
)
return False, "Too many failed authentication attempts. Please try again later."
# Check if key was provided
if not provided_key:
self._record_failed_attempt(client_ip)
self.audit.log_access_denied(
tool_name="api_authentication",
reason="missing_api_key",
)
return False, "Authorization header missing. Required: Authorization: Bearer <api-key>"
# Validate key format (should be at least 32 characters)
if len(provided_key) < 32:
self._record_failed_attempt(client_ip)
self.audit.log_access_denied(
tool_name="api_authentication",
reason="invalid_key_format",
)
return False, "Invalid API key format"
# Get valid API keys from config
valid_keys = self.settings.mcp_api_keys
if not valid_keys:
self.audit.log_security_event(
event_type="no_api_keys_configured",
description="No API keys configured in environment",
severity="critical",
metadata={"client_ip": client_ip},
)
return False, "Server configuration error: No API keys configured"
# Check against all valid keys (constant time comparison)
is_valid = any(self._constant_time_compare(provided_key, valid_key) for valid_key in valid_keys)
if is_valid:
# Success - log and return
key_hint = f"{provided_key[:8]}...{provided_key[-4:]}"
self.audit.log_tool_invocation(
tool_name="api_authentication",
result_status="success",
params={"client_ip": client_ip, "user_agent": user_agent, "key_hint": key_hint},
)
return True, None
else:
# Failure - record attempt and log
self._record_failed_attempt(client_ip)
key_hint = f"{provided_key[:8]}..." if len(provided_key) >= 8 else "too_short"
self.audit.log_access_denied(
tool_name="api_authentication",
reason="invalid_api_key",
)
self.audit.log_security_event(
event_type="invalid_api_key_attempt",
description=f"Invalid API key attempted from {client_ip}",
severity="medium",
metadata={
"client_ip": client_ip,
"user_agent": user_agent,
"key_hint": key_hint,
},
)
return False, "Invalid API key"
def extract_bearer_token(self, authorization_header: Optional[str]) -> Optional[str]:
"""Extract bearer token from Authorization header.
Args:
authorization_header: Authorization header value
Returns:
Extracted token or None if invalid format
"""
if not authorization_header:
return None
parts = authorization_header.split()
if len(parts) != 2:
return None
scheme, token = parts
if scheme.lower() != "bearer":
return None
return token
def generate_api_key(length: int = 64) -> str:
"""Generate a cryptographically secure API key.
Args:
length: Length of the key in characters (default: 64)
Returns:
Generated API key as hex string
"""
return secrets.token_hex(length // 2)
def hash_api_key(api_key: str) -> str:
"""Hash an API key for secure storage (future use).
Args:
api_key: Plain text API key
Returns:
SHA256 hash of the key
"""
return hashlib.sha256(api_key.encode()).hexdigest()
# Global validator instance
_validator: Optional[APIKeyValidator] = None
def get_validator() -> APIKeyValidator:
"""Get or create global API key validator instance."""
global _validator
if _validator is None:
_validator = APIKeyValidator()
return _validator
def reset_validator() -> None:
"""Reset global validator instance (primarily for testing)."""
global _validator
_validator = None