simple-ocpp-cs/app/ocpp_proto/chargepoint.py

107 lines
4.3 KiB
Python

from datetime import datetime, UTC
import os
from ocpp.routing import on
from ocpp.v201 import ChargePoint as cp
from ocpp.v201 import call_result
from ocpp.v201.datatypes import IdTokenInfoType, IdTokenType
from ocpp.v201.enums import Action, RegistrationStatusType, AuthorizationStatusType, IdTokenType as IdTokenEnumType
from app.database import SessionLocal
from app.models.chargepoint import ChargePoint as DbChargePoint
from app.models.connector import Connector
from app.models.id_token import IdToken
from app.schemas.connector import ConnectorStatus
class ChargePoint(cp):
@on(Action.BootNotification)
async def on_boot_notification(self, charging_station, **kwargs):
with SessionLocal() as db:
db_chargepoint = db.query(DbChargePoint).filter(DbChargePoint.friendly_name == self.id).first()
db_chargepoint.last_seen = datetime.now(UTC)
for key in charging_station.keys():
if key in db_chargepoint.__dict__:
setattr(db_chargepoint, key, charging_station[key])
db.commit()
return call_result.BootNotificationPayload(
current_time=datetime.now(UTC).isoformat(),
interval=int(os.getenv("CS_HEARTBEAT_INTERVAL", "1800")),
status=RegistrationStatusType.accepted
)
@on(Action.Heartbeat)
async def on_heartbeat_request(self):
with SessionLocal() as db:
db_chargepoint = db.query(DbChargePoint).filter(DbChargePoint.friendly_name == self.id).first()
db_chargepoint.last_seen = datetime.now(UTC)
db.commit()
return call_result.HeartbeatPayload(
current_time=datetime.now(UTC).isoformat()
)
@on(Action.StatusNotification)
async def on_status_notification(self, evse_id: int, connector_id: int, connector_status: str, **kwargs):
with SessionLocal() as db:
db_chargepoint = db.query(DbChargePoint).filter(DbChargePoint.friendly_name == self.id).first()
db_chargepoint.last_seen = datetime.now(UTC)
db_connector = db.query(Connector).filter(
Connector.chargepoint_id == db_chargepoint.id,
Connector.evse == evse_id,
Connector.index == connector_id
).first()
if db_connector == None:
db_connector = Connector(
chargepoint_id = db_chargepoint.id,
evse = evse_id,
index = connector_id,
status = ConnectorStatus(connector_status)
)
db.add(db_connector)
else:
db_connector.status = ConnectorStatus(connector_status)
db.commit()
return call_result.StatusNotificationPayload()
@on(Action.Authorize)
async def on_authorize(self, id_token, **kwargs):
if id_token["type"] not in ["ISO14443", "ISO15693"]:
return call_result.AuthorizePayload(
id_token_info=IdTokenInfoType(
status=AuthorizationStatusType.invalid
)
)
with SessionLocal() as db:
db_chargepoint = db.query(DbChargePoint).filter(DbChargePoint.friendly_name == self.id).first()
db_chargepoint.last_seen = datetime.now(UTC)
db_id_token = db.query(IdToken).filter(IdToken.token == id_token["id_token"]).first()
db.commit()
if db_id_token == None:
id_token_info=IdTokenInfoType(
status=AuthorizationStatusType.unknown
)
else:
if db_id_token.is_active == False:
id_token_info=IdTokenInfoType(
status=AuthorizationStatusType.blocked
)
else:
id_token_info=IdTokenInfoType(
status=AuthorizationStatusType.accepted,
group_id_token=IdTokenType(
type=IdTokenEnumType.central,
id_token=str(db_id_token.owner_id)
)
)
return call_result.AuthorizePayload(id_token_info)
@on(Action.TransactionEvent)
async def on_transaction_event(self):
return call_result.TransactionEventPayload()