Files
AegisGitea-MCP/src/aegis_gitea_mcp/config.py
latte c79cc1ab9e
Some checks failed
docker / lint (push) Has been cancelled
docker / test (push) Has been cancelled
docker / docker-build (push) Has been cancelled
lint / lint (push) Has been cancelled
test / test (push) Has been cancelled
Add PUBLIC_BASE_URL and refine OAuth scopes
2026-02-25 20:49:08 +01:00

340 lines
12 KiB
Python

"""Configuration management for AegisGitea MCP server."""
from __future__ import annotations
from pathlib import Path
from pydantic import Field, HttpUrl, field_validator, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
_ALLOWED_LOG_LEVELS = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}
_ALLOWED_SECRET_MODES = {"off", "mask", "block"}
_ALLOWED_ENVIRONMENTS = {"development", "staging", "production", "test"}
class Settings(BaseSettings):
"""Application settings loaded from environment variables."""
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
extra="ignore",
env_parse_none_str="null",
)
# Runtime environment
environment: str = Field(
default="production",
description="Runtime environment name",
)
# Gitea configuration
gitea_url: HttpUrl = Field(..., description="Base URL of the Gitea instance")
gitea_token: str = Field(
default="",
description=("Deprecated shared bot token. Not used for MCP tool execution in OAuth mode."),
)
# MCP server configuration
mcp_host: str = Field(
default="127.0.0.1",
description="Host interface to bind MCP server to",
)
mcp_port: int = Field(default=8080, description="Port to bind MCP server to", ge=1, le=65535)
allow_insecure_bind: bool = Field(
default=False,
description="Allow binding to 0.0.0.0 (disabled by default for local hardening)",
)
public_base_url: HttpUrl | None = Field(
default=None,
description=(
"Public externally-reachable base URL for this MCP server. "
"When set, OAuth metadata endpoints use this URL for absolute links."
),
)
# Logging and observability
log_level: str = Field(default="INFO", description="Application logging level")
audit_log_path: Path = Field(
default=Path("/var/log/aegis-mcp/audit.log"),
description="Path to tamper-evident audit log file",
)
metrics_enabled: bool = Field(default=True, description="Enable Prometheus metrics endpoint")
expose_error_details: bool = Field(
default=False,
description="Return internal error details in API responses (disabled by default)",
)
startup_validate_gitea: bool = Field(
default=True,
description="Validate Gitea connectivity during startup",
)
# Security limits
max_file_size_bytes: int = Field(
default=1_048_576,
description="Maximum file size that can be read (bytes)",
ge=1,
)
request_timeout_seconds: int = Field(
default=30,
description="Timeout for Gitea API requests (seconds)",
ge=1,
)
rate_limit_per_minute: int = Field(
default=60,
description="Maximum requests per minute for a single IP",
ge=1,
)
token_rate_limit_per_minute: int = Field(
default=120,
description="Maximum requests per minute per authenticated token",
ge=1,
)
max_tool_response_items: int = Field(
default=200,
description="Maximum list items returned by a tool response",
ge=1,
)
max_tool_response_chars: int = Field(
default=20_000,
description="Maximum characters returned in text fields",
ge=1,
)
secret_detection_mode: str = Field(
default="mask",
description="Secret detection mode: off, mask, or block",
)
# OAuth2 configuration (for ChatGPT per-user Gitea authentication)
oauth_mode: bool = Field(
default=False,
description=(
"Enable per-user OAuth2 authentication mode. "
"When true, each ChatGPT user authenticates with their own Gitea account. "
"GITEA_TOKEN and MCP_API_KEYS are not required in this mode."
),
)
gitea_oauth_client_id: str = Field(
default="",
description="Gitea OAuth2 application client ID (required when oauth_mode=true)",
)
gitea_oauth_client_secret: str = Field(
default="",
description="Gitea OAuth2 application client secret (required when oauth_mode=true)",
)
oauth_expected_audience: str = Field(
default="",
description=(
"Expected OIDC audience for access tokens. "
"Defaults to GITEA_OAUTH_CLIENT_ID when unset."
),
)
oauth_cache_ttl_seconds: int = Field(
default=300,
description="OIDC discovery/JWKS cache TTL in seconds",
ge=30,
)
oauth_resource_documentation: str = Field(
default="https://hiddenden.cafe/docs/mcp-gitea",
description="Public documentation URL for OAuth-protected MCP resource behavior",
)
# Authentication configuration
auth_enabled: bool = Field(
default=True,
description="Enable API key authentication (disable only in controlled testing)",
)
mcp_api_keys_raw: str = Field(
default="",
description="Comma-separated API keys for MCP access",
alias="MCP_API_KEYS",
)
max_auth_failures: int = Field(
default=5,
description="Maximum authentication failures before auth rate limiting",
ge=1,
)
auth_failure_window: int = Field(
default=300,
description="Time window for counting auth failures (seconds)",
ge=1,
)
# Policy and write-mode configuration
policy_file_path: Path = Field(
default=Path("policy.yaml"),
description="Path to YAML authorization policy file",
)
write_mode: bool = Field(default=False, description="Enable write-capable tools")
write_repository_whitelist_raw: str = Field(
default="",
description="Comma-separated repository whitelist for write mode (owner/repo)",
alias="WRITE_REPOSITORY_WHITELIST",
)
write_allow_all_token_repos: bool = Field(
default=False,
description=(
"Allow write-mode operations on any repository the token can access. "
"Disabled by default."
),
)
automation_enabled: bool = Field(
default=False,
description="Enable automation endpoints and workflows",
)
automation_scheduler_enabled: bool = Field(
default=False,
description="Enable built-in scheduled job loop",
)
automation_stale_days: int = Field(
default=30,
description="Number of days before an issue is considered stale",
ge=1,
)
@field_validator("environment")
@classmethod
def validate_environment(cls, value: str) -> str:
"""Validate deployment environment name."""
normalized = value.strip().lower()
if normalized not in _ALLOWED_ENVIRONMENTS:
raise ValueError(f"environment must be one of {_ALLOWED_ENVIRONMENTS}")
return normalized
@field_validator("log_level")
@classmethod
def validate_log_level(cls, value: str) -> str:
"""Validate log level is one of the allowed values."""
normalized = value.upper()
if normalized not in _ALLOWED_LOG_LEVELS:
raise ValueError(f"log_level must be one of {_ALLOWED_LOG_LEVELS}")
return normalized
@field_validator("public_base_url", mode="before")
@classmethod
def normalize_public_base_url(cls, value: object) -> object:
"""Treat empty PUBLIC_BASE_URL as unset."""
if isinstance(value, str) and not value.strip():
return None
return value
@field_validator("gitea_token")
@classmethod
def validate_token_not_empty(cls, value: str) -> str:
"""Validate Gitea token is trimmed (empty string allowed for oauth_mode)."""
cleaned = value.strip()
if value and not cleaned:
raise ValueError("gitea_token cannot be whitespace-only")
return cleaned
@field_validator("secret_detection_mode")
@classmethod
def validate_secret_detection_mode(cls, value: str) -> str:
"""Validate secret detection behavior setting."""
normalized = value.lower().strip()
if normalized not in _ALLOWED_SECRET_MODES:
raise ValueError(f"secret_detection_mode must be one of {_ALLOWED_SECRET_MODES}")
return normalized
@model_validator(mode="after")
def validate_security_constraints(self) -> Settings:
"""Validate cross-field security constraints."""
parsed_keys: list[str] = []
if self.mcp_api_keys_raw.strip():
parsed_keys = [
value.strip() for value in self.mcp_api_keys_raw.split(",") if value.strip()
]
object.__setattr__(self, "_mcp_api_keys", parsed_keys)
write_repositories: list[str] = []
if self.write_repository_whitelist_raw.strip():
write_repositories = [
value.strip()
for value in self.write_repository_whitelist_raw.split(",")
if value.strip()
]
for repository in write_repositories:
if "/" not in repository:
raise ValueError("WRITE_REPOSITORY_WHITELIST entries must be in owner/repo format")
object.__setattr__(self, "_write_repository_whitelist", write_repositories)
# Security decision: binding all interfaces requires explicit opt-in.
if self.mcp_host == "0.0.0.0" and not self.allow_insecure_bind:
raise ValueError(
"Binding to 0.0.0.0 is blocked by default. "
"Set ALLOW_INSECURE_BIND=true to explicitly permit this."
)
if self.oauth_mode:
# In OAuth mode, per-user Gitea tokens are used; no shared bot token or API keys needed.
if not self.gitea_oauth_client_id.strip():
raise ValueError("GITEA_OAUTH_CLIENT_ID is required when OAUTH_MODE=true.")
if not self.gitea_oauth_client_secret.strip():
raise ValueError("GITEA_OAUTH_CLIENT_SECRET is required when OAUTH_MODE=true.")
else:
# Standard API key mode: require bot token and at least one API key.
if not self.gitea_token.strip():
raise ValueError("GITEA_TOKEN is required unless OAUTH_MODE=true.")
if self.auth_enabled and not parsed_keys:
raise ValueError(
"At least one API key must be configured when auth_enabled=True. "
"Set MCP_API_KEYS or disable auth explicitly for controlled testing."
)
# Enforce minimum key length to reduce brute-force success probability.
for key in parsed_keys:
if len(key) < 32:
raise ValueError("API keys must be at least 32 characters long")
if self.write_mode and not self.write_allow_all_token_repos and not write_repositories:
raise ValueError(
"WRITE_MODE=true requires WRITE_REPOSITORY_WHITELIST to be configured "
"unless WRITE_ALLOW_ALL_TOKEN_REPOS=true"
)
return self
@property
def mcp_api_keys(self) -> list[str]:
"""Get parsed list of API keys."""
return list(getattr(self, "_mcp_api_keys", []))
@property
def write_repository_whitelist(self) -> list[str]:
"""Get parsed list of repositories allowed for write-mode operations."""
return list(getattr(self, "_write_repository_whitelist", []))
@property
def gitea_base_url(self) -> str:
"""Get Gitea base URL as normalized string."""
return str(self.gitea_url).rstrip("/")
@property
def public_base(self) -> str | None:
"""Get normalized public base URL when explicitly configured."""
if self.public_base_url is None:
return None
return str(self.public_base_url).rstrip("/")
_settings: Settings | None = None
def get_settings() -> Settings:
"""Get or create global settings instance."""
global _settings
if _settings is None:
# Mypy limitation: BaseSettings loads from environment dynamically.
_settings = Settings() # type: ignore[call-arg]
return _settings
def reset_settings() -> None:
"""Reset global settings instance (primarily for testing)."""
global _settings
_settings = None