from typing import Optional from fastapi import Request, HTTPException from fastapi.params import Depends from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.ext.asyncio import AsyncSession from app.database import get_db from app.services import token_service from app.util.errors import InsufficientPermissionsError, InvalidTokenAudienceError from app.schemas.auth_token import AccessToken class JWTBearer(HTTPBearer): __required_roles: list[str] | None def __init__( self, required_roles: Optional[list[str]] = None, auto_error: bool = True ): self.__required_roles = required_roles super(JWTBearer, self).__init__(auto_error=auto_error) async def __call__( self, request: Request, db: AsyncSession = Depends(get_db) ) -> AccessToken: credentials: HTTPAuthorizationCredentials | None = await super( JWTBearer, self ).__call__(request) if credentials: if not credentials.scheme == "Bearer": raise HTTPException( status_code=403, detail="authentication_scheme_invalid" ) try: token = await token_service.verify_access_token( credentials.credentials, self.__required_roles ) if not token: raise HTTPException( status_code=403, detail="token_invalid_or_expired" ) return token except InsufficientPermissionsError: raise HTTPException(status_code=403, detail="insufficient_permissions") except InvalidTokenAudienceError: raise HTTPException(status_code=403, detail="invalid_token_audience") else: raise HTTPException(status_code=403, detail="authorization_code_invalid")