simple-ocpp-cs/app/ocpp_proto/chargepoint_manager.py

33 lines
907 B
Python
Raw Permalink Normal View History

2024-03-28 21:23:25 +01:00
import logging
from typing import Any, Coroutine, Dict
2024-04-13 22:43:03 +02:00
from uuid import UUID
2024-03-28 21:23:25 +01:00
from websockets import ConnectionClosed
2024-04-13 22:43:03 +02:00
from app.ocpp_proto.chargepoint import ChargePoint
2024-03-28 21:23:25 +01:00
2024-04-13 22:43:03 +02:00
__active_connections: Dict[UUID, ChargePoint] = {}
2024-03-28 21:23:25 +01:00
2024-04-13 22:43:03 +02:00
async def start(id: UUID, cp: ChargePoint):
2024-03-28 21:23:25 +01:00
try:
2024-04-13 22:43:03 +02:00
__active_connections[id] = cp
2024-03-28 21:23:25 +01:00
await cp.start()
except ConnectionClosed:
2024-04-13 22:43:03 +02:00
logging.info("Charging station '%s' (%s) disconnected", cp.id, id)
__active_connections.pop(id, None)
2024-03-28 21:23:25 +01:00
async def call(
2024-04-13 22:43:03 +02:00
chargepoint_id: UUID,
2024-03-28 21:23:25 +01:00
payload: Any,
suppress: bool = True,
unique_id: Any | None = None
) -> Coroutine[Any, Any, Any | None]:
try:
cp = __active_connections[chargepoint_id]
2024-04-14 22:56:51 +02:00
return await cp.call(payload, suppress, unique_id)
2024-03-28 21:23:25 +01:00
except KeyError as e:
raise e
2024-04-13 22:43:03 +02:00
def is_connected(chargepoint_id: UUID):
2024-03-28 21:23:25 +01:00
return chargepoint_id in __active_connections.keys()