Some checks failed
CI/CD Pipeline / Code Quality Checks (push) Failing after 6m9s
CI/CD Pipeline / Security Scanning (push) Successful in 26s
CI/CD Pipeline / Tests (3.11) (push) Failing after 5m24s
CI/CD Pipeline / Tests (3.12) (push) Failing after 5m23s
CI/CD Pipeline / Build Docker Image (push) Has been skipped
CI/CD Pipeline / Deploy to Staging (push) Has been skipped
CI/CD Pipeline / Deploy to Production (push) Has been skipped
CI/CD Pipeline / Notification (push) Successful in 1s
79 lines
2.9 KiB
Python
79 lines
2.9 KiB
Python
"""Authentication helpers for the dashboard."""
|
|
|
|
from typing import Any
|
|
from urllib.parse import urlencode
|
|
|
|
import httpx
|
|
from authlib.integrations.starlette_client import OAuth
|
|
from fastapi import HTTPException, Request, status
|
|
|
|
from guardden.dashboard.config import DashboardSettings
|
|
|
|
|
|
def build_oauth(settings: DashboardSettings) -> OAuth:
|
|
"""Build OAuth client registrations."""
|
|
oauth = OAuth()
|
|
oauth.register(
|
|
name="entra",
|
|
client_id=settings.entra_client_id,
|
|
client_secret=settings.entra_client_secret.get_secret_value(),
|
|
server_metadata_url=(
|
|
"https://login.microsoftonline.com/"
|
|
f"{settings.entra_tenant_id}/v2.0/.well-known/openid-configuration"
|
|
),
|
|
client_kwargs={"scope": "openid profile email"},
|
|
)
|
|
return oauth
|
|
|
|
|
|
def discord_authorize_url(settings: DashboardSettings, state: str) -> str:
|
|
"""Generate the Discord OAuth authorization URL."""
|
|
query = urlencode(
|
|
{
|
|
"client_id": settings.discord_client_id,
|
|
"redirect_uri": settings.callback_url("discord"),
|
|
"response_type": "code",
|
|
"scope": "identify",
|
|
"state": state,
|
|
}
|
|
)
|
|
return f"https://discord.com/oauth2/authorize?{query}"
|
|
|
|
|
|
async def exchange_discord_code(settings: DashboardSettings, code: str) -> dict[str, Any]:
|
|
"""Exchange a Discord OAuth code for a user profile."""
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
token_response = await client.post(
|
|
"https://discord.com/api/oauth2/token",
|
|
data={
|
|
"client_id": settings.discord_client_id,
|
|
"client_secret": settings.discord_client_secret.get_secret_value(),
|
|
"grant_type": "authorization_code",
|
|
"code": code,
|
|
"redirect_uri": settings.callback_url("discord"),
|
|
},
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
|
)
|
|
token_response.raise_for_status()
|
|
token_data = token_response.json()
|
|
|
|
user_response = await client.get(
|
|
"https://discord.com/api/users/@me",
|
|
headers={"Authorization": f"Bearer {token_data['access_token']}"},
|
|
)
|
|
user_response.raise_for_status()
|
|
return user_response.json()
|
|
|
|
|
|
def require_owner(settings: DashboardSettings, request: Request) -> None:
|
|
"""Ensure the current session is the configured owner."""
|
|
session = request.session
|
|
entra_oid = session.get("entra_oid")
|
|
discord_id = session.get("discord_id")
|
|
if not entra_oid or not discord_id:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
|
|
if str(entra_oid) != settings.owner_entra_object_id:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
|
|
if int(discord_id) != settings.owner_discord_id:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
|