simple-ocpp-cs/src/ocpp_proto/chargepoint_manager.py

32 lines
859 B
Python
Raw Normal View History

2024-03-28 21:23:25 +01:00
import logging
from typing import Any, Coroutine, Dict
from websockets import ConnectionClosed
from ocpp_proto.chargepoint import ChargePoint
__active_connections: Dict[str, ChargePoint] = {}
async def start(cp: ChargePoint):
try:
__active_connections[cp.id] = cp
await cp.start()
except ConnectionClosed:
logging.info("Charging station '%s' disconnected", cp.id)
__active_connections.pop(cp.id, None)
async def call(
chargepoint_id: str,
payload: Any,
suppress: bool = True,
unique_id: Any | None = None
) -> Coroutine[Any, Any, Any | None]:
try:
cp = __active_connections[chargepoint_id]
return cp.call(payload, suppress, unique_id)
except KeyError as e:
raise e
def is_connected(chargepoint_id: str):
return chargepoint_id in __active_connections.keys()