from datetime import datetime, UTC import os from uuid import UUID from ocpp.routing import on, after from ocpp.v201 import ChargePoint as cp from ocpp.v201 import call_result from ocpp.v201.datatypes import IdTokenInfoType, IdTokenType from ocpp.v201.enums import Action, RegistrationStatusType, AuthorizationStatusType, IdTokenType as IdTokenEnumType, TransactionEventType from ocpp.v201.call import GetBaseReportPayload from app.database import SessionLocal from app.models.chargepoint import ChargePoint as DbChargePoint from app.models.connector import Connector as DbConnector from app.models.id_token import IdToken as DbIdToken from app.models.transaction import Transaction as DbTransaction from app.models.meter_value import MeterValue as DbMeterValue from app.schemas.connector import ConnectorStatus from app.schemas.transaction import TransactionStatus, TransactionEventTriggerReason from app.schemas.meter_value import Measurand, PhaseType from app.ocpp_proto.variable_manager import create_or_update_variable class ChargePoint(cp): async def __update_last_seen(self): with SessionLocal() as db: db_chargepoint = db.query(DbChargePoint).filter(DbChargePoint.identity == self.id).first() db_chargepoint.last_seen = datetime.now(UTC) db.commit() async def __get_id_token_info(self, id_token): owner_id = None if id_token["type"] not in ["ISO14443", "ISO15693"]: return IdTokenInfoType( status=AuthorizationStatusType.invalid ), owner_id with SessionLocal() as db: db_id_token = db.query(DbIdToken).filter(DbIdToken.token == id_token["id_token"]).first() if db_id_token == None: id_token_info = IdTokenInfoType( status=AuthorizationStatusType.unknown ) db_chargepoint = db.query(DbChargePoint).filter(DbChargePoint.identity == self.id).first() # Learn token if requested if db_chargepoint.learn_user_id != None: if db_chargepoint.learn_until.timestamp() > datetime.now(UTC).timestamp(): db_id_token = DbIdToken() db_id_token.friendly_name = "New token learned by {}".format(self.id) db_id_token.is_active = True db_id_token.owner_id = db_chargepoint.learn_user_id db_id_token.token = id_token["id_token"] db.add(db_id_token) id_token_info=IdTokenInfoType( status=AuthorizationStatusType.accepted ) owner_id = db_id_token.owner_id db_chargepoint.learn_user_id = None db_chargepoint.learn_until = None db.commit() else: owner_id = db_id_token.owner_id if db_id_token.is_active == False: id_token_info=IdTokenInfoType( status=AuthorizationStatusType.blocked ) else: id_token_info=IdTokenInfoType( status=AuthorizationStatusType.accepted ) return id_token_info, owner_id @on(Action.BootNotification) async def on_boot_notification(self, charging_station, **kwargs): with SessionLocal() as db: db_chargepoint = db.query(DbChargePoint).filter(DbChargePoint.identity == self.id).first() db_chargepoint.last_seen = datetime.now(UTC) for key in charging_station.keys(): if key in db_chargepoint.__dict__: setattr(db_chargepoint, key, charging_station[key]) db.commit() return call_result.BootNotificationPayload( current_time=datetime.now(UTC).isoformat(), interval=int(os.getenv("CS_HEARTBEAT_INTERVAL", "1800")), status=RegistrationStatusType.accepted ) @after(Action.BootNotification) async def after_boot_notification(self, **kwargs): await self.call(payload=GetBaseReportPayload(request_id=0, report_base="FullInventory")) @on(Action.NotifyReport) async def on_notify_report(self, report_data, **kwargs): with SessionLocal() as db: db_chargepoint = db.query(DbChargePoint).filter(DbChargePoint.identity == self.id).first() for entry in report_data: await create_or_update_variable( chargepoint_id=db_chargepoint.id, report_data=entry ) return call_result.NotifyReportPayload() @on(Action.Heartbeat) async def on_heartbeat_request(self): await self.__update_last_seen() return call_result.HeartbeatPayload( current_time=datetime.now(UTC).isoformat() ) @on(Action.StatusNotification) async def on_status_notification(self, evse_id: int, connector_id: int, connector_status: str, **kwargs): with SessionLocal() as db: db_chargepoint = db.query(DbChargePoint).filter(DbChargePoint.identity == self.id).first() db_chargepoint.last_seen = datetime.now(UTC) db_connector = db.query(DbConnector).filter( DbConnector.chargepoint_id == db_chargepoint.id, DbConnector.evse == evse_id, DbConnector.index == connector_id ).first() if db_connector == None: db_connector = DbConnector( chargepoint_id = db_chargepoint.id, evse = evse_id, index = connector_id, status = ConnectorStatus(connector_status) ) db.add(db_connector) else: db_connector.status = ConnectorStatus(connector_status) db.commit() return call_result.StatusNotificationPayload() @on(Action.Authorize) async def on_authorize(self, id_token, **kwargs): await self.__update_last_seen() id_token_info, _ = await self.__get_id_token_info(id_token) return call_result.AuthorizePayload(id_token_info) @on(Action.TransactionEvent) async def on_transaction_event( self, event_type, timestamp, trigger_reason, transaction_info, **kwargs ): if "id_token" in kwargs.keys(): id_token_info, token_owner_id = await self.__get_id_token_info(kwargs['id_token']) with SessionLocal() as db: chargepoint = db.query(DbChargePoint).filter(DbChargePoint.identity == self.id).first() chargepoint.last_seen = datetime.now(UTC) if event_type == str(TransactionEventType.started): meter_start=0 if "meter_value" in kwargs.keys(): for meter_value_entry in kwargs['meter_value']: for sampled_value in meter_value_entry['sampled_value']: if "measurand" in sampled_value.keys(): if sampled_value['measurand'] == str(Measurand.ENERGY_ACTIVE_IMPORT_REGISTER): meter_start = sampled_value['value'] else: meter_start = sampled_value['value'] transaction = DbTransaction( id=transaction_info["transaction_id"], status=TransactionStatus.ONGOING, started_at=datetime.fromisoformat(timestamp), meter_start=meter_start, price=chargepoint.price, chargepoint_id=chargepoint.id ) if "id_token" in kwargs.keys(): if id_token_info.status == AuthorizationStatusType.accepted: transaction.user_id = token_owner_id db.add(transaction) elif event_type == str(TransactionEventType.updated): transaction = db.get(DbTransaction, transaction_info["transaction_id"]) if transaction != None: if transaction.status == TransactionStatus.ONGOING: if "meter_value" in kwargs.keys(): for meter_value_entry in kwargs['meter_value']: timestamp = datetime.fromisoformat(meter_value_entry['timestamp']) for sampled_value in meter_value_entry['sampled_value']: db_meter_value = DbMeterValue() db_meter_value.transaction_id = transaction.id db_meter_value.timestamp = timestamp if "measurand" in sampled_value.keys(): db_meter_value.measurand = Measurand(sampled_value['measurand']) else: db_meter_value.measurand = Measurand.ENERGY_ACTIVE_IMPORT_REGISTER if "phase" in sampled_value.keys(): db_meter_value.phase_type = PhaseType(sampled_value['phase']) if "unit_of_measure" in sampled_value.keys(): if "unit" in sampled_value['unit_of_measure']: db_meter_value.unit = sampled_value['unit_of_measure']['unit'] else: db_meter_value.unit = "Wh" db_meter_value.value = sampled_value['value'] db.add(db_meter_value) if "id_token" in kwargs.keys(): if id_token_info.status == AuthorizationStatusType.accepted: transaction.user_id = token_owner_id elif event_type == str(TransactionEventType.ended): transaction = db.get(DbTransaction, transaction_info["transaction_id"]) if transaction != None: transaction.status = TransactionStatus.ENDED transaction.ended_at = datetime.fromisoformat(timestamp) transaction.end_reason = TransactionEventTriggerReason(trigger_reason) meter_end=0 if "meter_value" in kwargs.keys(): for meter_value_entry in kwargs['meter_value']: for sampled_value in meter_value_entry['sampled_value']: if "measurand" in sampled_value.keys(): if sampled_value['measurand'] == str(Measurand.ENERGY_ACTIVE_IMPORT_REGISTER): meter_end = sampled_value['value'] else: meter_end = sampled_value['value'] transaction.meter_end = meter_end if "id_token" in kwargs.keys(): if id_token_info.status == AuthorizationStatusType.accepted: transaction.user_id = token_owner_id db.commit() if "id_token" in kwargs.keys(): return call_result.TransactionEventPayload(id_token_info=id_token_info) return call_result.TransactionEventPayload() @on(Action.MeterValues) async def on_meter_values(self, **kwargs): return call_result.MeterValuesPayload() @on(Action.SecurityEventNotification) async def on_security_event_notification(self, **kwargs): return call_result.SecurityEventNotificationPayload()