simple-ocpp-cs/app/util/websocket_wrapper.py
2024-04-13 22:43:03 +02:00

19 lines
665 B
Python

from fastapi import WebSocket, WebSocketDisconnect
from websockets import ConnectionClosed
# Wrapper to transform a FastAPI websocket to a standard websocket
class WebSocketWrapper():
def __init__(self, websocket: WebSocket):
self._websocket = websocket
async def recv(self) -> str:
try:
return await self._websocket.receive_text()
except WebSocketDisconnect as e:
raise ConnectionClosed(e.code, 'WebSocketWrapper')
async def send(self, msg: str) -> None:
await self._websocket.send_text(msg)
async def close(self, code: int, reason: str) -> None:
await self._websocket.close(code)