simple-ocpp-cs/app/ocpp_proto/chargepoint_manager.py

33 lines
907 B
Python

import logging
from typing import Any, Coroutine, Dict
from uuid import UUID
from websockets import ConnectionClosed
from app.ocpp_proto.chargepoint import ChargePoint
__active_connections: Dict[UUID, ChargePoint] = {}
async def start(id: UUID, cp: ChargePoint):
try:
__active_connections[id] = cp
await cp.start()
except ConnectionClosed:
logging.info("Charging station '%s' (%s) disconnected", cp.id, id)
__active_connections.pop(id, None)
async def call(
chargepoint_id: UUID,
payload: Any,
suppress: bool = True,
unique_id: Any | None = None
) -> Coroutine[Any, Any, Any | None]:
try:
cp = __active_connections[chargepoint_id]
return await cp.call(payload, suppress, unique_id)
except KeyError as e:
raise e
def is_connected(chargepoint_id: UUID):
return chargepoint_id in __active_connections.keys()