simple-ocpp-cs/app/ocpp_proto/chargepoint.py
Oliver Traber 970da93192
All checks were successful
ci/woodpecker/push/docker Pipeline was successful
Add SecurityEventNotification handler
2024-04-20 17:46:46 +02:00

244 lines
12 KiB
Python

from datetime import datetime, UTC
import os
from uuid import UUID
from ocpp.routing import on, after
from ocpp.v201 import ChargePoint as cp
from ocpp.v201 import call_result
from ocpp.v201.datatypes import IdTokenInfoType, IdTokenType
from ocpp.v201.enums import Action, RegistrationStatusType, AuthorizationStatusType, IdTokenType as IdTokenEnumType, TransactionEventType
from ocpp.v201.call import GetBaseReportPayload
from app.database import SessionLocal
from app.models.chargepoint import ChargePoint as DbChargePoint
from app.models.connector import Connector as DbConnector
from app.models.id_token import IdToken as DbIdToken
from app.models.transaction import Transaction as DbTransaction
from app.models.meter_value import MeterValue as DbMeterValue
from app.schemas.connector import ConnectorStatus
from app.schemas.transaction import TransactionStatus, TransactionEventTriggerReason
from app.schemas.meter_value import Measurand, PhaseType
from app.ocpp_proto.variable_manager import create_or_update_variable
class ChargePoint(cp):
async def __update_last_seen(self):
with SessionLocal() as db:
db_chargepoint = db.query(DbChargePoint).filter(DbChargePoint.identity == self.id).first()
db_chargepoint.last_seen = datetime.now(UTC)
db.commit()
async def __get_id_token_info(self, id_token) -> IdTokenInfoType:
if id_token["type"] not in ["ISO14443", "ISO15693"]:
return IdTokenInfoType(
status=AuthorizationStatusType.invalid
)
with SessionLocal() as db:
db_id_token = db.query(DbIdToken).filter(DbIdToken.token == id_token["id_token"]).first()
if db_id_token == None:
id_token_info = IdTokenInfoType(
status=AuthorizationStatusType.unknown
)
db_chargepoint = db.query(DbChargePoint).filter(DbChargePoint.identity == self.id).first()
# Learn token if requested
if db_chargepoint.learn_user_id != None:
if db_chargepoint.learn_until.timestamp() > datetime.now(UTC).timestamp():
db_id_token = DbIdToken()
db_id_token.friendly_name = "New token learned by {}".format(self.id)
db_id_token.is_active = True
db_id_token.owner_id = db_chargepoint.learn_user_id
db_id_token.token = id_token["id_token"]
db.add(db_id_token)
id_token_info=IdTokenInfoType(
status=AuthorizationStatusType.accepted,
group_id_token=IdTokenType(
type=IdTokenEnumType.central,
id_token=str(db_id_token.owner_id)
)
)
db_chargepoint.learn_user_id = None
db_chargepoint.learn_until = None
db.commit()
else:
if db_id_token.is_active == False:
id_token_info=IdTokenInfoType(
status=AuthorizationStatusType.blocked
)
else:
id_token_info=IdTokenInfoType(
status=AuthorizationStatusType.accepted,
group_id_token=IdTokenType(
type=IdTokenEnumType.central,
id_token=str(db_id_token.owner_id)
)
)
return id_token_info
@on(Action.BootNotification)
async def on_boot_notification(self, charging_station, **kwargs):
with SessionLocal() as db:
db_chargepoint = db.query(DbChargePoint).filter(DbChargePoint.identity == self.id).first()
db_chargepoint.last_seen = datetime.now(UTC)
for key in charging_station.keys():
if key in db_chargepoint.__dict__:
setattr(db_chargepoint, key, charging_station[key])
db.commit()
return call_result.BootNotificationPayload(
current_time=datetime.now(UTC).isoformat(),
interval=int(os.getenv("CS_HEARTBEAT_INTERVAL", "1800")),
status=RegistrationStatusType.accepted
)
@after(Action.BootNotification)
async def after_boot_notification(self, **kwargs):
await self.call(payload=GetBaseReportPayload(request_id=0, report_base="FullInventory"))
@on(Action.NotifyReport)
async def on_notify_report(self, report_data, **kwargs):
with SessionLocal() as db:
db_chargepoint = db.query(DbChargePoint).filter(DbChargePoint.identity == self.id).first()
for entry in report_data:
await create_or_update_variable(
chargepoint_id=db_chargepoint.id,
report_data=entry
)
return call_result.NotifyReportPayload()
@on(Action.Heartbeat)
async def on_heartbeat_request(self):
await self.__update_last_seen()
return call_result.HeartbeatPayload(
current_time=datetime.now(UTC).isoformat()
)
@on(Action.StatusNotification)
async def on_status_notification(self, evse_id: int, connector_id: int, connector_status: str, **kwargs):
with SessionLocal() as db:
db_chargepoint = db.query(DbChargePoint).filter(DbChargePoint.identity == self.id).first()
db_chargepoint.last_seen = datetime.now(UTC)
db_connector = db.query(DbConnector).filter(
DbConnector.chargepoint_id == db_chargepoint.id,
DbConnector.evse == evse_id,
DbConnector.index == connector_id
).first()
if db_connector == None:
db_connector = DbConnector(
chargepoint_id = db_chargepoint.id,
evse = evse_id,
index = connector_id,
status = ConnectorStatus(connector_status)
)
db.add(db_connector)
else:
db_connector.status = ConnectorStatus(connector_status)
db.commit()
return call_result.StatusNotificationPayload()
@on(Action.Authorize)
async def on_authorize(self, id_token, **kwargs):
await self.__update_last_seen()
id_token_info = await self.__get_id_token_info(id_token)
return call_result.AuthorizePayload(id_token_info)
@on(Action.TransactionEvent)
async def on_transaction_event(
self,
event_type,
timestamp,
trigger_reason,
transaction_info,
id_token,
meter_value,
**kwargs
):
if id_token != None:
id_token_info = await self.__get_id_token_info(id_token)
with SessionLocal() as db:
chargepoint = db.query(DbChargePoint).filter(DbChargePoint.identity == self.id).first()
chargepoint.last_seen = datetime.now(UTC)
if event_type == str(TransactionEventType.started):
meter_start=0
if meter_value != None:
for meter_value_entry in meter_value:
for sampled_value in meter_value_entry['sampled_value']:
if "measurand" in sampled_value.keys():
if sampled_value['measurand'] == str(Measurand.ENERGY_ACTIVE_IMPORT_REGISTER):
meter_start = sampled_value['value']
else:
meter_start = sampled_value['value']
transaction = DbTransaction(
id=transaction_info["transaction_id"],
status=TransactionStatus.ONGOING,
started_at=datetime.fromisoformat(timestamp),
meter_start=meter_start,
price=chargepoint.price,
chargepoint_id=chargepoint.id
)
if id_token != None:
if id_token_info.status == AuthorizationStatusType.accepted:
transaction.user_id = UUID(id_token_info.group_id_token.id_token)
db.add(transaction)
elif event_type == str(TransactionEventType.updated):
transaction = db.get(DbTransaction, transaction_info["transaction_id"])
if meter_value != None:
for meter_value_entry in meter_value:
timestamp = datetime.fromisoformat(meter_value_entry['timestamp'])
for sampled_value in meter_value_entry['sampled_value']:
db_meter_value = DbMeterValue()
db_meter_value.transaction_id = transaction.id
db_meter_value.timestamp = timestamp
if "measurand" in sampled_value.keys():
db_meter_value.measurand = Measurand(sampled_value['measurand'])
else:
db_meter_value.measurand = Measurand.ENERGY_ACTIVE_IMPORT_REGISTER
if "phase" in sampled_value.keys():
db_meter_value.phase_type = PhaseType(sampled_value['phase'])
if "unit_of_measure" in sampled_value.keys():
if "unit" in sampled_value['unit_of_measure']:
db_meter_value.unit = sampled_value['unit_of_measure']['unit']
else:
db_meter_value.unit = "Wh"
db_meter_value.value = sampled_value['value']
db.add(db_meter_value)
if id_token != None:
if id_token_info.status == AuthorizationStatusType.accepted:
transaction.user_id = UUID(id_token_info.group_id_token.id_token)
elif event_type == str(TransactionEventType.ended):
transaction = db.get(DbTransaction, transaction_info["transaction_id"])
transaction.status = TransactionStatus.ENDED
transaction.ended_at = datetime.fromisoformat(timestamp)
transaction.end_reason = TransactionEventTriggerReason(trigger_reason)
meter_end=0
if meter_value != None:
for meter_value_entry in meter_value:
for sampled_value in meter_value_entry['sampled_value']:
if "measurand" in sampled_value.keys():
if sampled_value['measurand'] == str(Measurand.ENERGY_ACTIVE_IMPORT_REGISTER):
meter_end = sampled_value['value']
else:
meter_end = sampled_value['value']
transaction.meter_end = meter_end
if id_token != None:
if id_token_info.status == AuthorizationStatusType.accepted:
transaction.user_id = UUID(id_token_info.group_id_token.id_token)
db.commit()
if id_token != None:
return call_result.TransactionEventPayload(id_token_info=id_token_info)
return call_result.TransactionEventPayload()
@on(Action.MeterValues)
async def on_meter_values(self, **kwargs):
return call_result.MeterValuesPayload()
@on(Action.SecurityEventNotification)
async def on_security_event_notification(self, **kwargs):
return call_result.SecurityEventNotificationPayload()