from datetime import datetime, UTC import os from ocpp.routing import on, after from ocpp.v201 import ChargePoint as cp from ocpp.v201 import call_result from ocpp.v201.enums import Action, RegistrationStatusType, TransactionEventType from ocpp.v201.call import GetBaseReportPayload from app.services import ( variable_service, id_token_service, chargepoint_service, transaction_service ) class ChargePoint(cp): @on(Action.BootNotification) async def on_boot_notification(self, charging_station, **kwargs): await chargepoint_service.update_attributes( chargepoint_identity=self.id, charging_station=charging_station ) return call_result.BootNotificationPayload( current_time=datetime.now(UTC).isoformat(), interval=int(os.getenv("CS_HEARTBEAT_INTERVAL", "1800")), status=RegistrationStatusType.accepted ) @after(Action.BootNotification) async def after_boot_notification(self, **kwargs): await self.call(payload=GetBaseReportPayload(request_id=0, report_base="FullInventory")) @on(Action.NotifyReport) async def on_notify_report(self, report_data, **kwargs): for entry in report_data: await variable_service.create_or_update_variable( chargepoint_identity=self.id, report_entry=entry ) return call_result.NotifyReportPayload() @on(Action.Heartbeat) async def on_heartbeat_request(self): return call_result.HeartbeatPayload( current_time=datetime.now(UTC).isoformat() ) @after(Action.Heartbeat) async def after_heartbeat_request(self): await chargepoint_service.update_last_seen(chargepoint_identity=self.id) @on(Action.StatusNotification) async def on_status_notification(self, evse_id: int, connector_id: int, connector_status: str, **kwargs): await chargepoint_service.create_or_update_connector( chargepoint_identity=self.id, evse_id=evse_id, connector_id=connector_id, connector_status=connector_status ) return call_result.StatusNotificationPayload() @on(Action.Authorize) async def on_authorize(self, id_token, **kwargs): id_token_info, _ = await id_token_service.get_id_token_info(chargepoint_id=self.id, id_token=id_token) return call_result.AuthorizePayload(id_token_info) @on(Action.TransactionEvent) async def on_transaction_event( self, event_type, timestamp, trigger_reason, transaction_info, **kwargs ): if "id_token" in kwargs.keys(): id_token_info, token_owner_id = await id_token_service.get_id_token_info(chargepoint_id=self.id, id_token=kwargs['id_token']) else: id_token_info = None token_owner_id = None if event_type == str(TransactionEventType.started): await transaction_service.create_transaction( chargepoint_identity=self.id, user_id=token_owner_id, timestamp=datetime.fromisoformat(timestamp), transaction_info=transaction_info, transaction_data=kwargs ) elif event_type == str(TransactionEventType.updated): await transaction_service.update_transaction( transaction_id=transaction_info["transaction_id"], transaction_data=kwargs ) elif event_type == str(TransactionEventType.ended): await transaction_service.end_transaction( transaction_id=transaction_info["transaction_id"], timestamp=datetime.fromisoformat(timestamp), trigger_reason=trigger_reason, transaction_data=kwargs, user_id=token_owner_id ) if id_token_info == None: return call_result.TransactionEventPayload() else: return call_result.TransactionEventPayload(id_token_info=id_token_info) @on(Action.MeterValues) async def on_meter_values(self, **kwargs): return call_result.MeterValuesPayload() @on(Action.SecurityEventNotification) async def on_security_event_notification(self, **kwargs): return call_result.SecurityEventNotificationPayload()