46 lines
1.3 KiB
Python
46 lines
1.3 KiB
Python
import jwt
|
|
from fastapi import HTTPException, Request, status
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
|
|
from ..config import settings
|
|
|
|
security = HTTPBearer(auto_error=False)
|
|
|
|
|
|
def decode_jwt_token(token: str) -> dict:
|
|
"""Decode and validate JWT token"""
|
|
try:
|
|
payload = jwt.decode(
|
|
token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]
|
|
)
|
|
return payload
|
|
except jwt.ExpiredSignatureError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired"
|
|
)
|
|
except jwt.InvalidTokenError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token"
|
|
)
|
|
|
|
|
|
async def require_auth(request: Request):
|
|
"""Dependency to require authentication"""
|
|
auth_header = request.headers.get("Authorization")
|
|
|
|
if not auth_header:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated"
|
|
)
|
|
|
|
if not auth_header.startswith("Bearer "):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authorization header",
|
|
)
|
|
|
|
token = auth_header[7:] # Remove "Bearer " prefix
|
|
user = decode_jwt_token(token)
|
|
request.state.user = user
|
|
return user
|