111 lines
No EOL
3.4 KiB
Python
111 lines
No EOL
3.4 KiB
Python
from uuid import UUID
|
|
from fastapi import APIRouter, HTTPException
|
|
from fastapi.params import Depends
|
|
from sqlalchemy.orm import Session as DbSession
|
|
|
|
from app.database import get_db
|
|
from app.schemas.session import Session
|
|
from app.schemas.auth_token import AccessToken
|
|
from app.schemas.user import PasswordUpdate, UserUpdate, User
|
|
from app.security.jwt_bearer import JWTBearer
|
|
from app.services import session_service, user_service
|
|
from app.util.errors import InvalidStateError, NotFoundError
|
|
|
|
router = APIRouter(prefix="/me", tags=["Me (v1)"])
|
|
|
|
|
|
@router.get(path="", response_model=User)
|
|
async def get_myself(
|
|
db: DbSession = Depends(get_db), token: AccessToken = Depends(JWTBearer())
|
|
):
|
|
"""
|
|
Get the currently authenticated user.
|
|
"""
|
|
user = await user_service.get_user(db=db, id=UUID(token.subject))
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="user_not_found")
|
|
else:
|
|
return user
|
|
|
|
|
|
@router.patch(path="", response_model=User)
|
|
async def update_myself(
|
|
user_update: UserUpdate,
|
|
db: DbSession = Depends(get_db),
|
|
token: AccessToken = Depends(JWTBearer()),
|
|
):
|
|
"""
|
|
Update the currently authenticated user. Changing the email address automatically marks it as not verified
|
|
and starts a new verification workflow.
|
|
"""
|
|
try:
|
|
return await user_service.update_user(
|
|
db, UUID(token.subject), user_update
|
|
)
|
|
except NotFoundError:
|
|
raise HTTPException(status_code=404, detail="user_not_found")
|
|
|
|
|
|
@router.post(path="/password", response_model=list[None])
|
|
async def change_password(
|
|
update: PasswordUpdate,
|
|
db: DbSession = Depends(get_db),
|
|
token: AccessToken = Depends(JWTBearer()),
|
|
):
|
|
"""
|
|
Change the password of the currently authenticated user.
|
|
"""
|
|
try:
|
|
await user_service.change_user_password(
|
|
db=db, id=UUID(token.subject), update=update
|
|
)
|
|
return list()
|
|
except NotFoundError:
|
|
raise HTTPException(status_code=404, detail="user_not_found")
|
|
except InvalidStateError:
|
|
raise HTTPException(status_code=409, detail="incorrect_password")
|
|
|
|
|
|
@router.get(path="/sessions", response_model=list[Session])
|
|
async def get_user_sessions(
|
|
db: DbSession = Depends(get_db), token: AccessToken = Depends(JWTBearer())
|
|
):
|
|
"""
|
|
List the active sessions of the currently authenticated user.
|
|
"""
|
|
return await session_service.get_sessions_by_user(
|
|
db=db, user_id=UUID(token.subject)
|
|
)
|
|
|
|
|
|
@router.delete(path="/sessions", response_model=list[None])
|
|
async def clear_user_sessions(
|
|
db: DbSession = Depends(get_db), token: AccessToken = Depends(JWTBearer())
|
|
):
|
|
"""
|
|
Clear all sessions of the currently authenticated user.
|
|
"""
|
|
await session_service.remove_all_sessions_for_user(
|
|
db=db, user_id=UUID(token.subject),
|
|
)
|
|
return list()
|
|
|
|
|
|
@router.delete(path="/sessions/{session_id}", response_model=list[None])
|
|
async def delete_user_session(
|
|
session_id: UUID,
|
|
db: DbSession = Depends(get_db),
|
|
token: AccessToken = Depends(JWTBearer()),
|
|
):
|
|
"""
|
|
Invalidate a specific session of the currently authenticated user.
|
|
"""
|
|
try:
|
|
await session_service.remove_session_for_user(
|
|
db=db,
|
|
id=session_id,
|
|
user_id=UUID(token.subject),
|
|
)
|
|
except NotFoundError:
|
|
raise HTTPException(status_code=404, detail="session_not_found")
|
|
return list() |