feat: harden gateway with policy engine, secure tools, and governance docs

This commit is contained in:
2026-02-14 16:05:56 +01:00
parent e17d34e6d7
commit 5969892af3
55 changed files with 4711 additions and 1587 deletions

View File

@@ -1,11 +1,16 @@
"""Configuration management for AegisGitea MCP server."""
from __future__ import annotations
from pathlib import Path
from typing import Optional
from pydantic import Field, HttpUrl, field_validator, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
_ALLOWED_LOG_LEVELS = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}
_ALLOWED_SECRET_MODES = {"off", "mask", "block"}
_ALLOWED_ENVIRONMENTS = {"development", "staging", "production", "test"}
class Settings(BaseSettings):
"""Application settings loaded from environment variables."""
@@ -15,64 +20,86 @@ class Settings(BaseSettings):
env_file_encoding="utf-8",
case_sensitive=False,
extra="ignore",
# Don't try to parse env vars as JSON for complex types
env_parse_none_str="null",
)
# Runtime environment
environment: str = Field(
default="production",
description="Runtime environment name",
)
# Gitea configuration
gitea_url: HttpUrl = Field(
...,
description="Base URL of the Gitea instance",
)
gitea_token: str = Field(
...,
description="Bot user access token for Gitea API",
min_length=1,
)
gitea_url: HttpUrl = Field(..., description="Base URL of the Gitea instance")
gitea_token: str = Field(..., description="Bot user access token for Gitea API", min_length=1)
# MCP server configuration
mcp_host: str = Field(
default="0.0.0.0",
description="Host to bind MCP server to",
default="127.0.0.1",
description="Host interface to bind MCP server to",
)
mcp_port: int = Field(
default=8080,
description="Port to bind MCP server to",
ge=1,
le=65535,
mcp_port: int = Field(default=8080, description="Port to bind MCP server to", ge=1, le=65535)
allow_insecure_bind: bool = Field(
default=False,
description="Allow binding to 0.0.0.0 (disabled by default for local hardening)",
)
# Logging configuration
log_level: str = Field(
default="INFO",
description="Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
)
# Logging and observability
log_level: str = Field(default="INFO", description="Application logging level")
audit_log_path: Path = Field(
default=Path("/var/log/aegis-mcp/audit.log"),
description="Path to audit log file",
description="Path to tamper-evident audit log file",
)
metrics_enabled: bool = Field(default=True, description="Enable Prometheus metrics endpoint")
expose_error_details: bool = Field(
default=False,
description="Return internal error details in API responses (disabled by default)",
)
startup_validate_gitea: bool = Field(
default=True,
description="Validate Gitea connectivity during startup",
)
# Security configuration
# Security limits
max_file_size_bytes: int = Field(
default=1_048_576, # 1MB
description="Maximum file size that can be read (in bytes)",
default=1_048_576,
description="Maximum file size that can be read (bytes)",
ge=1,
)
request_timeout_seconds: int = Field(
default=30,
description="Timeout for Gitea API requests (in seconds)",
description="Timeout for Gitea API requests (seconds)",
ge=1,
)
rate_limit_per_minute: int = Field(
default=60,
description="Maximum number of requests per minute",
description="Maximum requests per minute for a single IP",
ge=1,
)
token_rate_limit_per_minute: int = Field(
default=120,
description="Maximum requests per minute per authenticated token",
ge=1,
)
max_tool_response_items: int = Field(
default=200,
description="Maximum list items returned by a tool response",
ge=1,
)
max_tool_response_chars: int = Field(
default=20_000,
description="Maximum characters returned in text fields",
ge=1,
)
secret_detection_mode: str = Field(
default="mask",
description="Secret detection mode: off, mask, or block",
)
# Authentication configuration
auth_enabled: bool = Field(
default=True,
description="Enable API key authentication (disable only for testing)",
description="Enable API key authentication (disable only in controlled testing)",
)
mcp_api_keys_raw: str = Field(
default="",
@@ -81,81 +108,149 @@ class Settings(BaseSettings):
)
max_auth_failures: int = Field(
default=5,
description="Maximum authentication failures before rate limiting",
description="Maximum authentication failures before auth rate limiting",
ge=1,
)
auth_failure_window: int = Field(
default=300, # 5 minutes
description="Time window for counting auth failures (in seconds)",
default=300,
description="Time window for counting auth failures (seconds)",
ge=1,
)
# Policy and write-mode configuration
policy_file_path: Path = Field(
default=Path("policy.yaml"),
description="Path to YAML authorization policy file",
)
write_mode: bool = Field(default=False, description="Enable write-capable tools")
write_repository_whitelist_raw: str = Field(
default="",
description="Comma-separated repository whitelist for write mode (owner/repo)",
alias="WRITE_REPOSITORY_WHITELIST",
)
automation_enabled: bool = Field(
default=False,
description="Enable automation endpoints and workflows",
)
automation_scheduler_enabled: bool = Field(
default=False,
description="Enable built-in scheduled job loop",
)
automation_stale_days: int = Field(
default=30,
description="Number of days before an issue is considered stale",
ge=1,
)
@field_validator("environment")
@classmethod
def validate_environment(cls, value: str) -> str:
"""Validate deployment environment name."""
normalized = value.strip().lower()
if normalized not in _ALLOWED_ENVIRONMENTS:
raise ValueError(f"environment must be one of {_ALLOWED_ENVIRONMENTS}")
return normalized
@field_validator("log_level")
@classmethod
def validate_log_level(cls, v: str) -> str:
def validate_log_level(cls, value: str) -> str:
"""Validate log level is one of the allowed values."""
allowed_levels = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}
v_upper = v.upper()
if v_upper not in allowed_levels:
raise ValueError(f"log_level must be one of {allowed_levels}")
return v_upper
normalized = value.upper()
if normalized not in _ALLOWED_LOG_LEVELS:
raise ValueError(f"log_level must be one of {_ALLOWED_LOG_LEVELS}")
return normalized
@field_validator("gitea_token")
@classmethod
def validate_token_not_empty(cls, v: str) -> str:
"""Validate Gitea token is not empty or whitespace."""
if not v.strip():
def validate_token_not_empty(cls, value: str) -> str:
"""Validate Gitea token is non-empty and trimmed."""
cleaned = value.strip()
if not cleaned:
raise ValueError("gitea_token cannot be empty or whitespace")
return v.strip()
return cleaned
@field_validator("secret_detection_mode")
@classmethod
def validate_secret_detection_mode(cls, value: str) -> str:
"""Validate secret detection behavior setting."""
normalized = value.lower().strip()
if normalized not in _ALLOWED_SECRET_MODES:
raise ValueError(f"secret_detection_mode must be one of {_ALLOWED_SECRET_MODES}")
return normalized
@model_validator(mode="after")
def validate_and_parse_api_keys(self) -> "Settings":
"""Parse and validate API keys if authentication is enabled."""
# Parse comma-separated keys into list
keys: list[str] = []
if self.mcp_api_keys_raw and self.mcp_api_keys_raw.strip():
keys = [key.strip() for key in self.mcp_api_keys_raw.split(",") if key.strip()]
def validate_security_constraints(self) -> Settings:
"""Validate cross-field security constraints."""
parsed_keys: list[str] = []
if self.mcp_api_keys_raw.strip():
parsed_keys = [
value.strip() for value in self.mcp_api_keys_raw.split(",") if value.strip()
]
# Store in a property we'll access
object.__setattr__(self, "_mcp_api_keys", keys)
object.__setattr__(self, "_mcp_api_keys", parsed_keys)
# Validate if auth is enabled
if self.auth_enabled and not keys:
write_repositories: list[str] = []
if self.write_repository_whitelist_raw.strip():
write_repositories = [
value.strip()
for value in self.write_repository_whitelist_raw.split(",")
if value.strip()
]
for repository in write_repositories:
if "/" not in repository:
raise ValueError("WRITE_REPOSITORY_WHITELIST entries must be in owner/repo format")
object.__setattr__(self, "_write_repository_whitelist", write_repositories)
# Security decision: binding all interfaces requires explicit opt-in.
if self.mcp_host == "0.0.0.0" and not self.allow_insecure_bind:
raise ValueError(
"At least one API key must be configured when auth_enabled=True. "
"Set MCP_API_KEYS environment variable or disable auth with AUTH_ENABLED=false"
"Binding to 0.0.0.0 is blocked by default. "
"Set ALLOW_INSECURE_BIND=true to explicitly permit this."
)
# Validate key format (at least 32 characters for security)
for key in keys:
if self.auth_enabled and not parsed_keys:
raise ValueError(
"At least one API key must be configured when auth_enabled=True. "
"Set MCP_API_KEYS or disable auth explicitly for controlled testing."
)
# Enforce minimum key length to reduce brute-force success probability.
for key in parsed_keys:
if len(key) < 32:
raise ValueError(
f"API keys must be at least 32 characters long. "
f"Use scripts/generate_api_key.py to generate secure keys."
)
raise ValueError("API keys must be at least 32 characters long")
if self.write_mode and not write_repositories:
raise ValueError("WRITE_MODE=true requires WRITE_REPOSITORY_WHITELIST to be configured")
return self
@property
def mcp_api_keys(self) -> list[str]:
"""Get parsed list of API keys."""
return getattr(self, "_mcp_api_keys", [])
return list(getattr(self, "_mcp_api_keys", []))
@property
def write_repository_whitelist(self) -> list[str]:
"""Get parsed list of repositories allowed for write-mode operations."""
return list(getattr(self, "_write_repository_whitelist", []))
@property
def gitea_base_url(self) -> str:
"""Get Gitea base URL as string."""
"""Get Gitea base URL as normalized string."""
return str(self.gitea_url).rstrip("/")
# Global settings instance
_settings: Optional[Settings] = None
_settings: Settings | None = None
def get_settings() -> Settings:
"""Get or create global settings instance."""
global _settings
if _settings is None:
_settings = Settings()
# Mypy limitation: BaseSettings loads from environment dynamically.
_settings = Settings() # type: ignore[call-arg]
return _settings