from datetime import datetime, UTC import os from ocpp.routing import on from ocpp.v201 import ChargePoint as cp from ocpp.v201 import call_result from ocpp.v201.enums import Action, RegistrationStatusType, AuthorizationStatusType from app.database import SessionLocal from app.models.chargepoint import ChargePoint from app.models.connector import Connector from app.models.id_token import IdToken from app.schemas.connector import ConnectorStatus class ChargePoint(cp): @on(Action.BootNotification) async def on_boot_notification(self, charging_station, reason, **kwargs): with SessionLocal() as db: db_chargepoint = db.query(ChargePoint).filter(ChargePoint.friendly_name == self.id).first() db_chargepoint.last_seen = datetime.now(UTC) db.commit() return call_result.BootNotificationPayload( current_time=datetime.now(UTC).isoformat(), interval=int(os.getenv("CS_HEARTBEAT_INTERVAL", "1800")), status=RegistrationStatusType.accepted ) @on(Action.Heartbeat) async def on_heartbeat_request(self): with SessionLocal() as db: db_chargepoint = db.query(ChargePoint).filter(ChargePoint.friendly_name == self.id).first() db_chargepoint.last_seen = datetime.now(UTC) db.commit() return call_result.HeartbeatPayload( current_time=datetime.now(UTC).isoformat() ) @on(Action.StatusNotification) async def on_status_notification(self, evse_id: int, connector_id: int, connector_status: str, **kwargs): with SessionLocal() as db: db_chargepoint = db.query(ChargePoint).filter(ChargePoint.friendly_name == self.id).first() db_chargepoint.last_seen = datetime.now(UTC) db_connector = db.query(Connector).filter( Connector.chargepoint_id == db_chargepoint.id, Connector.evse == evse_id, Connector.index == connector_id ).first() if db_connector == None: db_connector = Connector( chargepoint_id = db_chargepoint.id, evse = evse_id, index = connector_id, status = ConnectorStatus(connector_status) ) db.add(db_connector) else: db_connector.status = ConnectorStatus(connector_status) db.commit() return call_result.StatusNotificationPayload() @on(Action.Authorize) async def on_authorize(self, id_token, **kwargs): if id_token == None: return call_result.AuthorizePayload(id_token_info={'status': AuthorizationStatusType.invalid}) if id_token.type != "ISO14443" | "ISO15693": return call_result.AuthorizePayload(id_token_info={'status': AuthorizationStatusType.invalid}) with SessionLocal() as db: db_chargepoint = db.query(ChargePoint).filter(ChargePoint.friendly_name == self.id).first() db_chargepoint.last_seen = datetime.now(UTC) db_id_token = db.query(IdToken).filter(IdToken.token == id_token.id).first() db.commit() if db_id_token == None: return call_result.AuthorizePayload(id_token_info={'status': AuthorizationStatusType.unknown}) if db_id_token.is_active == False: return call_result.AuthorizePayload(id_token_info={'status': AuthorizationStatusType.blocked}) return call_result.AuthorizePayload(id_token_info={'status': AuthorizationStatusType.accepted, 'groupIdToken': str(db_id_token.owner_id)}) @on(Action.TransactionEvent) async def on_transaction_event(self): return call_result.TransactionEventPayload()