simple-ocpp-cs/app/ocpp_proto/chargepoint.py

89 lines
3.7 KiB
Python
Raw Normal View History

2024-04-13 22:43:03 +02:00
from datetime import datetime, UTC
import os
from ocpp.routing import on
from ocpp.v201 import ChargePoint as cp
from ocpp.v201 import call_result
from ocpp.v201.enums import Action, RegistrationStatusType, AuthorizationStatusType
from app.database import SessionLocal
from app.models.chargepoint import ChargePoint
from app.models.connector import Connector
from app.models.id_token import IdToken
from app.schemas.connector import ConnectorStatus
class ChargePoint(cp):
@on(Action.BootNotification)
async def on_boot_notification(self, charging_station, reason, **kwargs):
with SessionLocal() as db:
db_chargepoint = db.query(ChargePoint).filter(ChargePoint.friendly_name == self.id).first()
db_chargepoint.last_seen = datetime.now(UTC)
db.commit()
return call_result.BootNotificationPayload(
current_time=datetime.now(UTC).isoformat(),
interval=int(os.getenv("CS_HEARTBEAT_INTERVAL", "1800")),
status=RegistrationStatusType.accepted
)
@on(Action.Heartbeat)
async def on_heartbeat_request(self):
with SessionLocal() as db:
db_chargepoint = db.query(ChargePoint).filter(ChargePoint.friendly_name == self.id).first()
db_chargepoint.last_seen = datetime.now(UTC)
db.commit()
return call_result.HeartbeatPayload(
current_time=datetime.now(UTC).isoformat()
)
@on(Action.StatusNotification)
async def on_status_notification(self, evse_id: int, connector_id: int, connector_status: str, **kwargs):
with SessionLocal() as db:
db_chargepoint = db.query(ChargePoint).filter(ChargePoint.friendly_name == self.id).first()
db_chargepoint.last_seen = datetime.now(UTC)
db_connector = db.query(Connector).filter(
Connector.chargepoint_id == db_chargepoint.id,
Connector.evse == evse_id,
Connector.index == connector_id
).first()
if db_connector == None:
db_connector = Connector(
chargepoint_id = db_chargepoint.id,
evse = evse_id,
index = connector_id,
status = ConnectorStatus(connector_status)
)
db.add(db_connector)
else:
db_connector.status = ConnectorStatus(connector_status)
db.commit()
return call_result.StatusNotificationPayload()
@on(Action.Authorize)
async def on_authorize(self, id_token, **kwargs):
if id_token == None:
return call_result.AuthorizePayload(id_token_info={'status': AuthorizationStatusType.invalid})
if id_token.type != "ISO14443" | "ISO15693":
return call_result.AuthorizePayload(id_token_info={'status': AuthorizationStatusType.invalid})
with SessionLocal() as db:
db_chargepoint = db.query(ChargePoint).filter(ChargePoint.friendly_name == self.id).first()
db_chargepoint.last_seen = datetime.now(UTC)
db_id_token = db.query(IdToken).filter(IdToken.token == id_token.id).first()
db.commit()
if db_id_token == None:
return call_result.AuthorizePayload(id_token_info={'status': AuthorizationStatusType.unknown})
if db_id_token.is_active == False:
return call_result.AuthorizePayload(id_token_info={'status': AuthorizationStatusType.blocked})
return call_result.AuthorizePayload(id_token_info={'status': AuthorizationStatusType.accepted, 'groupIdToken': str(db_id_token.owner_id)})
@on(Action.TransactionEvent)
async def on_transaction_event(self):
return call_result.TransactionEventPayload()