simple-ocpp-cs/backend/app/ocpp_proto/chargepoint.py
2025-03-13 22:11:20 +01:00

118 lines
4.3 KiB
Python

from datetime import datetime, UTC
import os
from ocpp.routing import on, after
from ocpp.v201 import ChargePoint as cp
from ocpp.v201 import call_result
from ocpp.v201.enums import Action, RegistrationStatusEnumType, TransactionEventEnumType
from ocpp.v201.call import GetBaseReport
from app.services import (
variable_service,
id_token_service,
chargepoint_service,
transaction_service
)
class ChargePoint(cp):
@on(Action.boot_notification)
async def on_boot_notification(self, charging_station, **kwargs):
await chargepoint_service.update_attributes(
chargepoint_identity=self.id,
charging_station=charging_station
)
return call_result.BootNotification(
current_time=datetime.now(UTC).isoformat(),
interval=int(os.getenv("CS_HEARTBEAT_INTERVAL", "1800")),
status=RegistrationStatusEnumType.accepted
)
@after(Action.boot_notification)
async def after_boot_notification(self, **kwargs):
await self.call(payload=GetBaseReport(request_id=0, report_base="FullInventory"))
@on(Action.notify_report)
async def on_notify_report(self, report_data, **kwargs):
for entry in report_data:
await variable_service.create_or_update_variable(
chargepoint_identity=self.id,
report_entry=entry
)
return call_result.NotifyReport()
@on(Action.heartbeat)
async def on_heartbeat_request(self):
return call_result.Heartbeat(
current_time=datetime.now(UTC).isoformat()
)
@after(Action.heartbeat)
async def after_heartbeat_request(self):
await chargepoint_service.update_last_seen(chargepoint_identity=self.id)
@on(Action.status_notification)
async def on_status_notification(self, evse_id: int, connector_id: int, connector_status: str, **kwargs):
await chargepoint_service.create_or_update_connector(
chargepoint_identity=self.id,
evse_id=evse_id,
connector_id=connector_id,
connector_status=connector_status
)
return call_result.StatusNotification()
@on(Action.authorize)
async def on_authorize(self, id_token, **kwargs):
id_token_info, _ = await id_token_service.get_id_token_info(chargepoint_id=self.id, id_token=id_token)
return call_result.Authorize(id_token_info)
@on(Action.transaction_event)
async def on_transaction_event(
self,
event_type,
timestamp,
trigger_reason,
transaction_info,
**kwargs
):
if "id_token" in kwargs.keys():
id_token_info, token_owner_id = await id_token_service.get_id_token_info(chargepoint_id=self.id, id_token=kwargs['id_token'])
else:
id_token_info = None
token_owner_id = None
if event_type == str(TransactionEventEnumType.started):
await transaction_service.create_transaction(
chargepoint_identity=self.id,
user_id=token_owner_id,
timestamp=datetime.fromisoformat(timestamp),
transaction_info=transaction_info,
transaction_data=kwargs
)
elif event_type == str(TransactionEventEnumType.updated):
await transaction_service.update_transaction(
transaction_id=transaction_info["transaction_id"],
transaction_data=kwargs
)
elif event_type == str(TransactionEventEnumType.ended):
await transaction_service.end_transaction(
transaction_id=transaction_info["transaction_id"],
timestamp=datetime.fromisoformat(timestamp),
trigger_reason=trigger_reason,
transaction_data=kwargs,
user_id=token_owner_id
)
if id_token_info == None:
return call_result.TransactionEvent()
else:
return call_result.TransactionEvent(id_token_info=id_token_info)
@on(Action.meter_values)
async def on_meter_values(self, **kwargs):
return call_result.MeterValues()
@on(Action.security_event_notification)
async def on_security_event_notification(self, **kwargs):
return call_result.SecurityEventNotification()