Reafctor app

This commit is contained in:
Oliver Traber 2024-04-13 22:43:03 +02:00
parent b8216f6ade
commit 7740be8bb5
Signed by: Bluemedia
GPG key ID: C0674B105057136C
36 changed files with 389 additions and 208 deletions

0
app/__init__.py Normal file
View file

24
app/database.py Normal file
View file

@ -0,0 +1,24 @@
import os
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = os.getenv("CS_DATABASE_URL", "sqlite:///./simple-ocpp-cs.db")
if SQLALCHEMY_DATABASE_URL.startswith("sqlite"):
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
else:
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

35
app/main.py Normal file
View file

@ -0,0 +1,35 @@
from fastapi import FastAPI
from starlette.middleware.authentication import AuthenticationMiddleware
import uvicorn
from app.database import engine, Base
from app.models import *
from app.routers import chargepoint_v1, ocpp_v1
from app.util.websocket_auth_backend import BasicAuthBackend
Base.metadata.create_all(bind=engine)
def create_ocpp_app():
app_ocpp = FastAPI(
responses={404: {"description": "Not found"}},
)
app_ocpp.include_router(ocpp_v1.router)
app_ocpp.add_middleware(AuthenticationMiddleware, backend=BasicAuthBackend())
return app_ocpp
def create_app():
app = FastAPI(
responses={404: {"description": "Not found"}},
)
app.include_router(chargepoint_v1.router, prefix="/v1")
app.mount(path="/v1/ocpp", app=create_ocpp_app())
return app
app = create_app()
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")

8
app/models/__init__.py Normal file
View file

@ -0,0 +1,8 @@
__all__ = [
"chargepoint",
"connector",
"id_token",
"meter_value",
"transaction",
"user"
]

16
app/models/chargepoint.py Normal file
View file

@ -0,0 +1,16 @@
import uuid
from sqlalchemy import Uuid, Boolean, Column, DateTime, String
from sqlalchemy.orm import relationship
from app.database import Base
class ChargePoint(Base):
__tablename__ = "chargepoints"
id = Column(Uuid, primary_key=True, default=uuid.uuid4)
friendly_name = Column(String, unique=True, index=True)
is_active = Column(Boolean, default=True)
password = Column(String)
last_seen = Column(DateTime, nullable=True)
connectors = relationship("Connector", cascade="delete, delete-orphan")

15
app/models/connector.py Normal file
View file

@ -0,0 +1,15 @@
import uuid
from sqlalchemy import Uuid, Column, Enum, ForeignKey, Integer
from app.schemas.connector import ConnectorStatus
from app.database import Base
class Connector(Base):
__tablename__ = "connectors"
id = Column(Uuid, primary_key=True, default=uuid.uuid4)
evse = Column(Integer)
index = Column(Integer)
status = Column(Enum(ConnectorStatus))
chargepoint_id = Column(Uuid, ForeignKey("chargepoints.id"))

16
app/models/id_token.py Normal file
View file

@ -0,0 +1,16 @@
import uuid
from sqlalchemy import Uuid, Boolean, Column, ForeignKey, String
from sqlalchemy.orm import relationship
from app.database import Base
class IdToken(Base):
__tablename__ = "id_tokens"
id = Column(Uuid, primary_key=True, default=uuid.uuid4)
title = Column(String)
is_active = Column(Boolean, default=True)
token = Column(String, index=True)
owner_id = Column(Uuid, ForeignKey("users.id"))
owner = relationship("User", back_populates="id_tokens")

17
app/models/meter_value.py Normal file
View file

@ -0,0 +1,17 @@
import uuid
from sqlalchemy import Uuid, Column, DateTime, Enum, Float, ForeignKey, String
from app.database import Base
from app.schemas.meter_value import Measurand, PhaseType
class Transaction(Base):
__tablename__ = "meter_values"
id = Column(Uuid, primary_key=True, default=uuid.uuid4)
timestamp = Column(DateTime, index=True)
measurand = Column(Enum(Measurand))
phase_type = Column(Enum(PhaseType))
unit = Column(String)
value = Column(Float)
transaction_id = Column(Uuid, ForeignKey("transactions.id"), index=True)

20
app/models/transaction.py Normal file
View file

@ -0,0 +1,20 @@
import uuid
from sqlalchemy import Uuid, Column, DateTime, Enum, Float, ForeignKey
from sqlalchemy.orm import relationship, backref
from app.schemas.transaction import TransactionEventTriggerReason, TransactionStatus
from app.database import Base
class Transaction(Base):
__tablename__ = "transactions"
id = Column(Uuid, primary_key=True, default=uuid.uuid4)
status = Column(Enum(TransactionStatus), index=True)
started_at = Column(DateTime, index=True)
ended_at = Column(DateTime, nullable=True, index=True)
meter_start = Column(Float)
meter_end = Column(Float, nullable=True)
end_reason = Column(Enum(TransactionEventTriggerReason))
connector_id = Column(Uuid, ForeignKey("connectors.id"), index=True)
id_token_id = Column(Uuid, ForeignKey("id_tokens.id"), index= True)

14
app/models/user.py Normal file
View file

@ -0,0 +1,14 @@
import uuid
from sqlalchemy import Uuid, Boolean, Column, String
from sqlalchemy.orm import relationship
from app.database import Base
class User(Base):
__tablename__ = "users"
id = Column(Uuid, primary_key=True, default=uuid.uuid4)
friendly_name = Column(String, unique=True, index=True)
is_active = Column(Boolean, default=True)
id_tokens = relationship("IdToken", back_populates="owner", cascade="delete, delete-orphan")

View file

View file

@ -0,0 +1,88 @@
from datetime import datetime, UTC
import os
from ocpp.routing import on
from ocpp.v201 import ChargePoint as cp
from ocpp.v201 import call_result
from ocpp.v201.enums import Action, RegistrationStatusType, AuthorizationStatusType
from app.database import SessionLocal
from app.models.chargepoint import ChargePoint
from app.models.connector import Connector
from app.models.id_token import IdToken
from app.schemas.connector import ConnectorStatus
class ChargePoint(cp):
@on(Action.BootNotification)
async def on_boot_notification(self, charging_station, reason, **kwargs):
with SessionLocal() as db:
db_chargepoint = db.query(ChargePoint).filter(ChargePoint.friendly_name == self.id).first()
db_chargepoint.last_seen = datetime.now(UTC)
db.commit()
return call_result.BootNotificationPayload(
current_time=datetime.now(UTC).isoformat(),
interval=int(os.getenv("CS_HEARTBEAT_INTERVAL", "1800")),
status=RegistrationStatusType.accepted
)
@on(Action.Heartbeat)
async def on_heartbeat_request(self):
with SessionLocal() as db:
db_chargepoint = db.query(ChargePoint).filter(ChargePoint.friendly_name == self.id).first()
db_chargepoint.last_seen = datetime.now(UTC)
db.commit()
return call_result.HeartbeatPayload(
current_time=datetime.now(UTC).isoformat()
)
@on(Action.StatusNotification)
async def on_status_notification(self, evse_id: int, connector_id: int, connector_status: str, **kwargs):
with SessionLocal() as db:
db_chargepoint = db.query(ChargePoint).filter(ChargePoint.friendly_name == self.id).first()
db_chargepoint.last_seen = datetime.now(UTC)
db_connector = db.query(Connector).filter(
Connector.chargepoint_id == db_chargepoint.id,
Connector.evse == evse_id,
Connector.index == connector_id
).first()
if db_connector == None:
db_connector = Connector(
chargepoint_id = db_chargepoint.id,
evse = evse_id,
index = connector_id,
status = ConnectorStatus(connector_status)
)
db.add(db_connector)
else:
db_connector.status = ConnectorStatus(connector_status)
db.commit()
return call_result.StatusNotificationPayload()
@on(Action.Authorize)
async def on_authorize(self, id_token, **kwargs):
if id_token == None:
return call_result.AuthorizePayload(id_token_info={'status': AuthorizationStatusType.invalid})
if id_token.type != "ISO14443" | "ISO15693":
return call_result.AuthorizePayload(id_token_info={'status': AuthorizationStatusType.invalid})
with SessionLocal() as db:
db_chargepoint = db.query(ChargePoint).filter(ChargePoint.friendly_name == self.id).first()
db_chargepoint.last_seen = datetime.now(UTC)
db_id_token = db.query(IdToken).filter(IdToken.token == id_token.id).first()
db.commit()
if db_id_token == None:
return call_result.AuthorizePayload(id_token_info={'status': AuthorizationStatusType.unknown})
if db_id_token.is_active == False:
return call_result.AuthorizePayload(id_token_info={'status': AuthorizationStatusType.blocked})
return call_result.AuthorizePayload(id_token_info={'status': AuthorizationStatusType.accepted, 'groupIdToken': str(db_id_token.owner_id)})
@on(Action.TransactionEvent)
async def on_transaction_event(self):
return call_result.TransactionEventPayload()

View file

@ -0,0 +1,32 @@
import logging
from typing import Any, Coroutine, Dict
from uuid import UUID
from websockets import ConnectionClosed
from app.ocpp_proto.chargepoint import ChargePoint
__active_connections: Dict[UUID, ChargePoint] = {}
async def start(id: UUID, cp: ChargePoint):
try:
__active_connections[id] = cp
await cp.start()
except ConnectionClosed:
logging.info("Charging station '%s' (%s) disconnected", cp.id, id)
__active_connections.pop(id, None)
async def call(
chargepoint_id: UUID,
payload: Any,
suppress: bool = True,
unique_id: Any | None = None
) -> Coroutine[Any, Any, Any | None]:
try:
cp = __active_connections[chargepoint_id]
return cp.call(payload, suppress, unique_id)
except KeyError as e:
raise e
def is_connected(chargepoint_id: UUID):
return chargepoint_id in __active_connections.keys()

0
app/routers/__init__.py Normal file
View file

View file

@ -0,0 +1,114 @@
import random
import string
from uuid import UUID
from fastapi import APIRouter, HTTPException, Security
from fastapi.params import Depends
from sqlalchemy.orm import Session
from app.database import get_db
from app.ocpp_proto import chargepoint_manager
from app.schemas.chargepoint import ChargePoint, ChargePointCreate, ChargePointUpdate, ChargePointPassword, ChargePointConnectionInfo
from app.models.chargepoint import ChargePoint as DBChargePoint
from app.security import get_api_key
router = APIRouter(
prefix="/chargepoint",
tags=["chargepoint"],
)
@router.get(path="/", response_model=list[ChargePoint])
async def get_chargepoints(
skip: int = 0,
limit: int = 20,
api_key: str = Security(get_api_key),
db: Session = Depends(get_db)
):
return db.query(DBChargePoint).offset(skip).limit(limit).all()
@router.get(path="/{chargepoint_id}", response_model=ChargePoint)
async def get_chargepoint(
chargepoint_id: UUID,
api_key: str = Security(get_api_key),
db: Session = Depends(get_db)
):
chargepoint = db.query(DBChargePoint).filter(DBChargePoint.id == chargepoint_id).first()
if chargepoint is None:
raise HTTPException(status_code=404, detail="Chargepoint not found")
return chargepoint
@router.get(path="/{chargepoint_id}/password", response_model=ChargePointPassword)
async def get_chargepoint_password(
chargepoint_id: UUID,
api_key: str = Security(get_api_key),
db: Session = Depends(get_db)
):
chargepoint = db.query(DBChargePoint).filter(DBChargePoint.id == chargepoint_id).first()
if chargepoint is None:
raise HTTPException(status_code=404, detail="Chargepoint not found")
return ChargePointPassword(password=chargepoint.password)
@router.delete(path="/{chargepoint_id}/password", response_model=ChargePointPassword)
async def reset_chargepoint_password(
chargepoint_id: UUID,
api_key: str = Security(get_api_key),
db: Session = Depends(get_db)
):
chargepoint = db.query(DBChargePoint).filter(DBChargePoint.id == chargepoint_id).first()
if chargepoint is None:
raise HTTPException(status_code=404, detail="Chargepoint not found")
chargepoint.password = ''.join(random.choice(string.ascii_letters + string.digits) for i in range(24))
db.commit()
return ChargePointPassword(password=chargepoint.password)
@router.post(path="/", response_model=ChargePoint)
async def create_chargepoint(
chargepoint: ChargePointCreate,
api_key: str = Security(get_api_key),
db: Session = Depends(get_db)
):
chargepoint_db = DBChargePoint(
friendly_name=chargepoint.friendly_name,
is_active=chargepoint.is_active,
password=''.join(random.choice(string.ascii_letters + string.digits) for i in range(24))
)
db.add(chargepoint_db)
db.commit()
db.refresh(chargepoint_db)
return chargepoint_db
@router.patch(path="/{chargepoint_id}", response_model=ChargePoint)
async def update_chargepoint(
chargepoint_id: UUID,
chargepoint_update: ChargePointUpdate,
api_key: str = Security(get_api_key),
db: Session = Depends(get_db)
):
chargepoint = db.query(DBChargePoint).filter(DBChargePoint.id == chargepoint_id).first()
if chargepoint is None:
raise HTTPException(status_code=404, detail="Chargepoint not found")
for key, value in chargepoint_update.model_dump(exclude_unset=True).items():
setattr(chargepoint, key, value)
db.commit()
return chargepoint
@router.delete(path="/{chargepoint_id}", response_model=[])
async def delete_chargepoint(
chargepoint_id: UUID,
api_key: str = Security(get_api_key),
db: Session = Depends(get_db)
):
chargepoint = db.query(DBChargePoint).filter(DBChargePoint.id == chargepoint_id).first()
if chargepoint is None:
raise HTTPException(status_code=404, detail="Chargepoint not found")
db.delete(chargepoint)
db.commit()
return []
@router.get(path="/{chargepoint_id}/status", response_model=ChargePointConnectionInfo)
async def get_chargepoint_status(
chargepoint_id: UUID,
api_key: str = Security(get_api_key)
):
return ChargePointConnectionInfo(
connected=chargepoint_manager.is_connected(chargepoint_id)
)

47
app/routers/ocpp_v1.py Normal file
View file

@ -0,0 +1,47 @@
import logging
from fastapi import APIRouter, WebSocket, WebSocketException
from app.ocpp_proto import chargepoint_manager
from app.ocpp_proto.chargepoint import ChargePoint
from app.util.websocket_wrapper import WebSocketWrapper
router = APIRouter()
@router.websocket("/{charging_station_friendly_name}")
async def websocket_endpoint(
*,
websocket: WebSocket,
charging_station_friendly_name: str,
):
""" For every new charging station that connects, create a ChargePoint
instance and start listening for messages.
"""
if (websocket.user.friendly_name != charging_station_friendly_name):
raise WebSocketException(code=1008, reason="Username doesn't match chargepoint identifier")
logging.info("Charging station '%s' (%s) connected", charging_station_friendly_name, websocket.user.id)
# Check protocols
try:
requested_protocols = websocket.headers['sec-websocket-protocol']
logging.info("Protocols advertised by charging station: %s", requested_protocols)
except KeyError:
logging.warning("Charging station hasn't advertised any subprotocol. "
"Closing Connection")
return await websocket.close()
if "ocpp2.0.1" in requested_protocols:
logging.info("Matched supported protocol: ocpp2.0.1")
else:
logging.warning('Protocols mismatched | Expected subprotocols: %s,'
' but client supports %s | Closing connection',
"ocpp2.0.1",
requested_protocols)
await websocket.accept()
await websocket.close()
return
# Accept connection and begin communication
await websocket.accept(subprotocol="ocpp2.0.1")
cp = ChargePoint(charging_station_friendly_name, WebSocketWrapper(websocket))
await chargepoint_manager.start(websocket.user.id, cp)

0
app/schemas/__init__.py Normal file
View file

View file

@ -0,0 +1,31 @@
from datetime import datetime
from typing import Optional
from uuid import UUID
from pydantic import BaseModel
from app.schemas.connector import Connector
class ChargePointBase(BaseModel):
friendly_name: str
is_active: bool
class ChargePointUpdate(BaseModel):
friendly_name: Optional[str] = None
is_active: Optional[bool] = None
class ChargePointCreate(ChargePointBase):
pass
class ChargePoint(ChargePointBase):
id: UUID
last_seen: datetime | None
connectors: list[Connector] = []
class Config:
from_attributes = True
class ChargePointPassword(BaseModel):
password: str
class ChargePointConnectionInfo(BaseModel):
connected: bool

19
app/schemas/connector.py Normal file
View file

@ -0,0 +1,19 @@
import enum
from uuid import UUID
from pydantic import BaseModel
class ConnectorStatus(enum.Enum):
AVAILABLE = "Available"
OCCUPIED = "Occupied"
RESERVED = "Reserved"
UNAVAILABLE = "Unavailable"
FAULTED = "Faulted"
class Connector(BaseModel):
id: UUID
evse: int
index: int
status: ConnectorStatus
class Config:
from_attributes = True

18
app/schemas/id_token.py Normal file
View file

@ -0,0 +1,18 @@
from uuid import UUID
from pydantic import BaseModel
from app.schemas.user import User
class IdTokenBase(BaseModel):
title: str
is_active: bool
class IdTokenCreate(IdTokenBase):
pass
class IdToken(IdTokenBase):
id: UUID
owner: User
class Config:
from_attributes = True

View file

@ -0,0 +1,55 @@
from datetime import datetime
import enum
from uuid import UUID
from pydantic import BaseModel
class PhaseType(enum.Enum):
L1 = "L1"
L2 = "L2"
L3 = "L3"
N = "N"
L1_N = "L1-N"
L2_N = "L2-N"
L3_N = "L3-N"
L1_L2 = "L1-L2"
L2_L3 = "L2-L3"
L3_L1 = "L3-L1"
class Measurand(enum.Enum):
CURRENT_EXPORT = "Current.Export"
CURRENT_IMPORT = "Current.Import"
CURRENT_OFFERED = "Current.Offered"
ENERGY_ACTIVE_NET = "Energy.Active.Net"
ENERGY_ACTIVE_EXPORT_REGISTER = "Energy.Active.Export.Register"
ENERGY_ACTIVE_IMPORT_REGISTER = "Energy.Active.Import.Register"
ENERGY_ACTIVE_EXPORT_INTERVAL = "Energy.Active.Export.Interval"
ENERGY_ACTIVE_IMPORT_INTERVAL = "Energy.Active.Import.Interval"
ENERGY_REACTIVE_NET = "Energy.Reactive.Net"
ENERGY_REACTIVE_EXPORT_REGISTER = "Energy.Reactive.Export.Register"
ENERGY_REACTIVE_IMPORT_REGISTER = "Energy.Reactive.Import.Register"
ENERGY_REACTIVE_EXPORT_INTERVAL = "Energy.Reactive.Export.Interval"
ENERGY_REACTIVE_IMPORT_INTERVAL = "Energy.Reactive.Import.Interval"
ENERGY_APPARENT_NET = "Energy.Apparent.Net"
ENERGY_APPARENT_IMPORT = "Energy.Apparent.Import"
ENERGY_APPARENT_EXPORT = "Energy.Apparent.Export"
FREQUENCY = "Frequency"
POWER_ACTIVE_EXPORT = "Power.Active.Export"
POWER_ACTIVE_IMPORT = "Power.Active.Import"
POWER_FACTOR = "Power.Factor"
POWER_OFFERED = "Power.Offered"
POWER_REACTIVE_EXPORT = "Power.Reactive.Export"
POWER_REACTIVE_IMPORT = "Power.Reactive.Import"
SOC = "SoC"
VOLTAGE = "Voltage"
class MeterValue(BaseModel):
id: UUID
timestamp: datetime
measurand: Measurand
phase_type: PhaseType
unit: str
value: float
transaction_id: str
class Config:
from_attributes = True

View file

@ -0,0 +1,45 @@
from datetime import datetime
import enum
from uuid import UUID
from pydantic import BaseModel
class TransactionStatus(enum.Enum):
ONGOING = "ongoing"
ENDED = "ended"
class TransactionEventTriggerReason(enum.Enum):
AUTHORIZED = "Authorized"
CABLE_PLUGGED_IN = "CablePluggedIn"
CHARGING_RATE_CHANGED = "ChargingRateChanged"
CHARGING_STATE_CHANGED = "ChargingStateChanged"
DEAUTHORIZED = "Deauthorized"
ENERGY_LIMIT_REACHED = "EnergyLimitReached"
EV_COMMUNICATION_LOST = "EVCommunicationLost"
EV_CONNECT_TIMEOUT = "EVConnectTimeout"
METER_VALUE_CLOCK = "MeterValueClock"
METER_VALUE_PERIODIC = "MeterValuePeriodic"
TIME_LIMIT_REACHED = "TimeLimitReached"
TRIGGER = "Trigger"
UNLOCK_COMMAND = "UnlockCommand"
STOP_AUTHORIZED = "StopAuthorized"
EV_DEPARTED = "EVDeparted"
EV_DETECTED = "EVDetected"
REMOTE_STOP = "RemoteStop"
REMOTE_START = "RemoteStart"
ABNORMAL_CONDITION = "AbnormalCondition"
SIGNED_DATA_RECEIVED = "SignedDataReceived"
RESET_COMMAND = "ResetCommand"
class Transaction(BaseModel):
id: UUID
status: TransactionStatus
started_at: datetime
ended_at: datetime
meter_start: float
meter_end: float
end_reason: TransactionEventTriggerReason
connector_id: str
id_token_id: str
class Config:
from_attributes = True

18
app/schemas/user.py Normal file
View file

@ -0,0 +1,18 @@
from uuid import UUID
from pydantic import BaseModel
from app.schemas.id_token import IdToken
class UserBase(BaseModel):
friendly_name: str
is_active: bool
class UserCreate(UserBase):
pass
class User(UserBase):
id: UUID
id_tokens: list[IdToken] = []
class Config:
from_attributes = True

34
app/security.py Normal file
View file

@ -0,0 +1,34 @@
import os
from fastapi import HTTPException, Security, status
from fastapi.security import APIKeyHeader, HTTPBasic
basic_auth = HTTPBasic()
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
def get_api_key(
api_key_header: str = Security(api_key_header),
) -> str:
"""Retrieve and validate an API key from the HTTP header.
Args:
api_key_header: The API key passed in the HTTP header.
Returns:
The validated API key.
Raises:
HTTPException: If the API key is invalid or missing.
"""
api_key = os.getenv("CS_API_KEY", "default")
if api_key == "default":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="API key not set. Authentication not possible.",
)
if api_key_header == api_key:
return api_key_header
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or missing API Key",
)

0
app/util/__init__.py Normal file
View file

View file

@ -0,0 +1,38 @@
import base64
import binascii
from uuid import UUID
from starlette.authentication import (
AuthCredentials, AuthenticationBackend, AuthenticationError, SimpleUser
)
from app.database import SessionLocal
from app.models.chargepoint import ChargePoint
class BasicAuthBackend(AuthenticationBackend):
async def authenticate(self, conn):
if "Authorization" not in conn.headers:
raise AuthenticationError('No Authorization header provided')
auth = conn.headers["Authorization"]
try:
scheme, credentials = auth.split()
if scheme.lower() != 'basic':
raise AuthenticationError('Invalid authorization scheme')
decoded = base64.b64decode(credentials).decode("ascii")
except (ValueError, UnicodeDecodeError, binascii.Error) as exc:
raise AuthenticationError('Invalid basic auth credentials')
username, _, password = decoded.partition(":")
try:
id = UUID(username)
except (ValueError) as exc:
raise AuthenticationError('Invalid basic auth credentials')
with SessionLocal() as db:
chargepoint = db.query(ChargePoint).filter(ChargePoint.id == id).first()
if chargepoint is None:
raise AuthenticationError('Invalid basic auth credentials')
if chargepoint.password != password:
raise AuthenticationError('Invalid basic auth credentials')
return AuthCredentials(["authenticated"]), chargepoint

View file

@ -0,0 +1,19 @@
from fastapi import WebSocket, WebSocketDisconnect
from websockets import ConnectionClosed
# Wrapper to transform a FastAPI websocket to a standard websocket
class WebSocketWrapper():
def __init__(self, websocket: WebSocket):
self._websocket = websocket
async def recv(self) -> str:
try:
return await self._websocket.receive_text()
except WebSocketDisconnect as e:
raise ConnectionClosed(e.code, 'WebSocketWrapper')
async def send(self, msg: str) -> None:
await self._websocket.send_text(msg)
async def close(self, code: int, reason: str) -> None:
await self._websocket.close(code)