Implement transaction handling

This commit is contained in:
Oliver Traber 2024-04-19 00:08:29 +02:00
parent 161c6aa027
commit a65dee8962
Signed by: Bluemedia
GPG key ID: C0674B105057136C
16 changed files with 331 additions and 94 deletions

1
.gitignore vendored
View file

@ -1,2 +1,3 @@
**/__pycache__
simple-ocpp-cs.db
.env

View file

@ -1,13 +1,20 @@
from fastapi import FastAPI, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from dotenv import load_dotenv
from fastapi import FastAPI
from starlette.middleware.authentication import AuthenticationMiddleware
import uvicorn
load_dotenv()
from app.database import engine, Base
from app.models import *
from app.routers import chargepoint_v1, id_token_v1, ocpp_v1, user_v1
from app.routers import (
chargepoint_v1,
id_token_v1,
meter_value_v1,
ocpp_v1,
transaction_v1,
user_v1
)
from app.util.websocket_auth_backend import BasicAuthBackend
Base.metadata.create_all(bind=engine)
@ -29,11 +36,10 @@ def create_app():
app.include_router(chargepoint_v1.router, prefix="/v1")
app.include_router(id_token_v1.router, prefix="/v1")
app.include_router(user_v1.router, prefix="/v1")
app.include_router(meter_value_v1.router, prefix="/v1")
app.include_router(transaction_v1.router, prefix="/v1")
app.mount(path="/v1/ocpp", app=create_ocpp_app())
return app
app = create_app()
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")

View file

@ -1,5 +1,5 @@
import uuid
from sqlalchemy import Uuid, Boolean, Column, DateTime, String
from sqlalchemy import ForeignKey, Numeric, Uuid, Boolean, Column, DateTime, String
from sqlalchemy.orm import relationship
from app.database import Base
@ -8,9 +8,10 @@ class ChargePoint(Base):
__tablename__ = "chargepoints"
id = Column(Uuid, primary_key=True, default=uuid.uuid4)
friendly_name = Column(String, unique=True, index=True)
identity = Column(String, unique=True, index=True)
is_active = Column(Boolean, default=True)
password = Column(String)
price = Column(Numeric(10, 2))
last_seen = Column(DateTime, nullable=True)
vendor_name = Column(String, nullable=True)
@ -18,4 +19,7 @@ class ChargePoint(Base):
serial_number = Column(String, nullable=True)
firmware_version = Column(String, nullable=True)
learn_user_id = Column(Uuid, ForeignKey("users.id"), nullable=True)
learn_until = Column(DateTime, nullable=True)
connectors = relationship("Connector", cascade="delete, delete-orphan")

View file

@ -4,14 +4,14 @@ from sqlalchemy import Uuid, Column, DateTime, Enum, Float, ForeignKey, String
from app.database import Base
from app.schemas.meter_value import Measurand, PhaseType
class Transaction(Base):
class MeterValue(Base):
__tablename__ = "meter_values"
id = Column(Uuid, primary_key=True, default=uuid.uuid4)
timestamp = Column(DateTime, index=True)
measurand = Column(Enum(Measurand))
phase_type = Column(Enum(PhaseType))
unit = Column(String)
measurand = Column(Enum(Measurand), index=True)
phase_type = Column(Enum(PhaseType), nullable=True)
unit = Column(String, nullable=True)
value = Column(Float)
transaction_id = Column(Uuid, ForeignKey("transactions.id"), index=True)
transaction_id = Column(String, ForeignKey("transactions.id"), index=True)

View file

@ -1,6 +1,4 @@
import uuid
from sqlalchemy import Uuid, Column, DateTime, Enum, Float, ForeignKey
from sqlalchemy.orm import relationship, backref
from sqlalchemy import String, Uuid, Column, DateTime, Enum, Numeric, ForeignKey
from app.schemas.transaction import TransactionEventTriggerReason, TransactionStatus
from app.database import Base
@ -8,13 +6,14 @@ from app.database import Base
class Transaction(Base):
__tablename__ = "transactions"
id = Column(Uuid, primary_key=True, default=uuid.uuid4)
id = Column(String, primary_key=True)
status = Column(Enum(TransactionStatus), index=True)
started_at = Column(DateTime, index=True)
ended_at = Column(DateTime, nullable=True, index=True)
meter_start = Column(Float)
meter_end = Column(Float, nullable=True)
end_reason = Column(Enum(TransactionEventTriggerReason))
meter_start = Column(Numeric(10,2))
meter_end = Column(Numeric(10,2), nullable=True)
end_reason = Column(Enum(TransactionEventTriggerReason), nullable=True)
price = Column(Numeric(10,2))
connector_id = Column(Uuid, ForeignKey("connectors.id"), index=True)
id_token_id = Column(Uuid, ForeignKey("id_tokens.id"), index= True)
user_id = Column(Uuid, ForeignKey("users.id"), nullable=True, index=True)
chargepoint_id = Column(Uuid, ForeignKey("chargepoints.id"), index=True)

View file

@ -12,3 +12,4 @@ class User(Base):
is_active = Column(Boolean, default=True)
id_tokens = relationship("IdToken", back_populates="owner", cascade="delete, delete-orphan")
transactions = relationship("Transaction", cascade="delete, delete-orphan")

View file

@ -1,24 +1,61 @@
from datetime import datetime, UTC
import os
from uuid import UUID
from ocpp.routing import on
from ocpp.v201 import ChargePoint as cp
from ocpp.v201 import call_result
from ocpp.v201.datatypes import IdTokenInfoType, IdTokenType
from ocpp.v201.enums import Action, RegistrationStatusType, AuthorizationStatusType, IdTokenType as IdTokenEnumType
from ocpp.v201.enums import Action, RegistrationStatusType, AuthorizationStatusType, IdTokenType as IdTokenEnumType, TransactionEventType
from app.database import SessionLocal
from app.models.chargepoint import ChargePoint as DbChargePoint
from app.models.connector import Connector
from app.models.id_token import IdToken
from app.models.connector import Connector as DbConnector
from app.models.id_token import IdToken as DbIdToken
from app.models.transaction import Transaction as DbTransaction
from app.models.meter_value import MeterValue as DbMeterValue
from app.schemas.connector import ConnectorStatus
from app.schemas.transaction import TransactionStatus, TransactionEventTriggerReason
from app.schemas.meter_value import Measurand, PhaseType
class ChargePoint(cp):
async def __update_last_seen(self):
with SessionLocal() as db:
db_chargepoint = db.query(DbChargePoint).filter(DbChargePoint.identity == self.id).first()
db_chargepoint.last_seen = datetime.now(UTC)
db.commit()
async def __get_id_token_info(self, id_token) -> IdTokenInfoType:
if id_token["type"] not in ["ISO14443", "ISO15693"]:
return IdTokenInfoType(
status=AuthorizationStatusType.invalid
)
with SessionLocal() as db:
db_id_token = db.query(DbIdToken).filter(DbIdToken.token == id_token["id_token"]).first()
if db_id_token == None:
return IdTokenInfoType(
status=AuthorizationStatusType.unknown
)
if db_id_token.is_active == False:
id_token_info=IdTokenInfoType(
status=AuthorizationStatusType.blocked
)
else:
id_token_info=IdTokenInfoType(
status=AuthorizationStatusType.accepted,
group_id_token=IdTokenType(
type=IdTokenEnumType.central,
id_token=str(db_id_token.owner_id)
)
)
return id_token_info
@on(Action.BootNotification)
async def on_boot_notification(self, charging_station, **kwargs):
with SessionLocal() as db:
db_chargepoint = db.query(DbChargePoint).filter(DbChargePoint.friendly_name == self.id).first()
db_chargepoint = db.query(DbChargePoint).filter(DbChargePoint.identity == self.id).first()
db_chargepoint.last_seen = datetime.now(UTC)
for key in charging_station.keys():
if key in db_chargepoint.__dict__:
@ -32,10 +69,7 @@ class ChargePoint(cp):
@on(Action.Heartbeat)
async def on_heartbeat_request(self):
with SessionLocal() as db:
db_chargepoint = db.query(DbChargePoint).filter(DbChargePoint.friendly_name == self.id).first()
db_chargepoint.last_seen = datetime.now(UTC)
db.commit()
await self.__update_last_seen()
return call_result.HeartbeatPayload(
current_time=datetime.now(UTC).isoformat()
)
@ -43,16 +77,16 @@ class ChargePoint(cp):
@on(Action.StatusNotification)
async def on_status_notification(self, evse_id: int, connector_id: int, connector_status: str, **kwargs):
with SessionLocal() as db:
db_chargepoint = db.query(DbChargePoint).filter(DbChargePoint.friendly_name == self.id).first()
db_chargepoint = db.query(DbChargePoint).filter(DbChargePoint.identity == self.id).first()
db_chargepoint.last_seen = datetime.now(UTC)
db_connector = db.query(Connector).filter(
Connector.chargepoint_id == db_chargepoint.id,
Connector.evse == evse_id,
Connector.index == connector_id
db_connector = db.query(DbConnector).filter(
DbConnector.chargepoint_id == db_chargepoint.id,
DbConnector.evse == evse_id,
DbConnector.index == connector_id
).first()
if db_connector == None:
db_connector = Connector(
db_connector = DbConnector(
chargepoint_id = db_chargepoint.id,
evse = evse_id,
index = connector_id,
@ -68,39 +102,95 @@ class ChargePoint(cp):
@on(Action.Authorize)
async def on_authorize(self, id_token, **kwargs):
if id_token["type"] not in ["ISO14443", "ISO15693"]:
return call_result.AuthorizePayload(
id_token_info=IdTokenInfoType(
status=AuthorizationStatusType.invalid
)
)
with SessionLocal() as db:
db_chargepoint = db.query(DbChargePoint).filter(DbChargePoint.friendly_name == self.id).first()
db_chargepoint.last_seen = datetime.now(UTC)
db_id_token = db.query(IdToken).filter(IdToken.token == id_token["id_token"]).first()
db.commit()
if db_id_token == None:
id_token_info=IdTokenInfoType(
status=AuthorizationStatusType.unknown
)
else:
if db_id_token.is_active == False:
id_token_info=IdTokenInfoType(
status=AuthorizationStatusType.blocked
)
else:
id_token_info=IdTokenInfoType(
status=AuthorizationStatusType.accepted,
group_id_token=IdTokenType(
type=IdTokenEnumType.central,
id_token=str(db_id_token.owner_id)
)
)
await self.__update_last_seen()
id_token_info = await self.__get_id_token_info(id_token)
return call_result.AuthorizePayload(id_token_info)
@on(Action.TransactionEvent)
async def on_transaction_event(self):
async def on_transaction_event(
self,
event_type,
timestamp,
trigger_reason,
transaction_info,
id_token,
meter_value,
**kwargs
):
if id_token != None:
id_token_info = await self.__get_id_token_info(id_token)
with SessionLocal() as db:
chargepoint = db.query(DbChargePoint).filter(DbChargePoint.identity == self.id).first()
chargepoint.last_seen = datetime.now(UTC)
if event_type == str(TransactionEventType.started):
meter_start=0
if meter_value != None:
for meter_value_entry in meter_value:
for sampled_value in meter_value_entry['sampled_value']:
if "measurand" in sampled_value.keys():
if sampled_value['measurand'] == str(Measurand.ENERGY_ACTIVE_IMPORT_REGISTER):
meter_start = sampled_value['value']
else:
meter_start = sampled_value['value']
transaction = DbTransaction(
id=transaction_info["transaction_id"],
status=TransactionStatus.ONGOING,
started_at=datetime.fromisoformat(timestamp),
meter_start=meter_start,
price=chargepoint.price,
chargepoint_id=chargepoint.id
)
if id_token != None:
if id_token_info.status == AuthorizationStatusType.accepted:
transaction.user_id = UUID(id_token_info.group_id_token.id_token)
db.add(transaction)
elif event_type == str(TransactionEventType.updated):
transaction = db.get(DbTransaction, transaction_info["transaction_id"])
if meter_value != None:
for meter_value_entry in meter_value:
timestamp = datetime.fromisoformat(meter_value_entry['timestamp'])
for sampled_value in meter_value_entry['sampled_value']:
db_meter_value = DbMeterValue()
db_meter_value.transaction_id = transaction.id
db_meter_value.timestamp = timestamp
if "measurand" in sampled_value.keys():
db_meter_value.measurand = Measurand(sampled_value['measurand'])
else:
db_meter_value.measurand = Measurand.ENERGY_ACTIVE_IMPORT_REGISTER
if "phase" in sampled_value.keys():
db_meter_value.phase_type = PhaseType(sampled_value['phase'])
if "unit_of_measure" in sampled_value.keys():
if "unit" in sampled_value['unit_of_measure']:
db_meter_value.unit = sampled_value['unit_of_measure']['unit']
else:
db_meter_value.unit = "Wh"
db_meter_value.value = sampled_value['value']
db.add(db_meter_value)
if id_token != None:
if id_token_info.status == AuthorizationStatusType.accepted:
transaction.user_id = UUID(id_token_info.group_id_token.id_token)
elif event_type == str(TransactionEventType.ended):
transaction = db.get(DbTransaction, transaction_info["transaction_id"])
transaction.status = TransactionStatus.ENDED
transaction.ended_at = datetime.fromisoformat(timestamp)
transaction.end_reason = TransactionEventTriggerReason(trigger_reason)
meter_end=0
if meter_value != None:
for meter_value_entry in meter_value:
for sampled_value in meter_value_entry['sampled_value']:
if "measurand" in sampled_value.keys():
if sampled_value['measurand'] == str(Measurand.ENERGY_ACTIVE_IMPORT_REGISTER):
meter_end = sampled_value['value']
else:
meter_end = sampled_value['value']
transaction.meter_end = meter_end
if id_token != None:
if id_token_info.status == AuthorizationStatusType.accepted:
transaction.user_id = UUID(id_token_info.group_id_token.id_token)
db.commit()
if id_token != None:
return call_result.TransactionEventPayload(id_token_info=id_token_info)
return call_result.TransactionEventPayload()

View file

@ -1,5 +1,6 @@
import random
import string
from datetime import datetime, timedelta, UTC
from uuid import UUID
from fastapi import APIRouter, HTTPException, Security
from fastapi.params import Depends
@ -18,7 +19,9 @@ from app.schemas.chargepoint import (
ChargePointResetRequest,
ChargePointResetResponse
)
from app.schemas.id_token import IdTokenLearnRequest, IdTokenLearnResponse
from app.models.chargepoint import ChargePoint as DbChargePoint
from app.models.user import User as DbUser
from app.security import get_api_key
router = APIRouter(
@ -77,9 +80,10 @@ async def create_chargepoint(
db: Session = Depends(get_db)
):
chargepoint_db = DbChargePoint(
friendly_name=chargepoint.friendly_name,
identity=chargepoint.identity,
is_active=chargepoint.is_active,
password=''.join(random.choice(string.ascii_letters + string.digits) for i in range(24))
password=''.join(random.choice(string.ascii_letters + string.digits) for i in range(24)),
price=chargepoint.price
)
db.add(chargepoint_db)
db.commit()
@ -139,3 +143,73 @@ async def reset_chargepoint(
return ChargePointResetResponse(status=response.status)
except TimeoutError:
raise HTTPException(status_code=503, detail="Chargepoint didn't respond in time.")
@router.post(path="/{chargepoint_id}/token-learning", status_code=201, response_model=IdTokenLearnResponse)
async def create_id_token_learn_request(
chargepoint_id: UUID,
learn_request: IdTokenLearnRequest,
api_key: str = Security(get_api_key),
db: Session = Depends(get_db)
):
chargepoint = db.get(DbChargePoint, chargepoint_id)
if chargepoint is None:
raise HTTPException(status_code=404, detail="Chargepoint not found")
owner = db.get(DbUser, learn_request.user_id)
if owner == None:
raise HTTPException(status_code=422, detail=[{
"loc": ["body", "user_id"],
"msg": "Target user not found",
"type": "invalid_relation"
}])
chargepoint.learn_user_id = learn_request.user_id
if learn_request.until == None:
chargepoint.learn_until = datetime.now(UTC) + timedelta(minutes=5)
else:
chargepoint.learn_until = learn_request.until
db.commit()
return IdTokenLearnResponse(
user_id=chargepoint.learn_user_id,
until=chargepoint.learn_until
)
@router.get(path="/{chargepoint_id}/token-learning", response_model=IdTokenLearnResponse)
async def get_id_token_learn_request(
chargepoint_id: UUID,
api_key: str = Security(get_api_key),
db: Session = Depends(get_db)
):
chargepoint = db.get(DbChargePoint, chargepoint_id)
if chargepoint is None:
raise HTTPException(status_code=404, detail="Chargepoint not found")
if chargepoint.learn_user_id == None:
raise HTTPException(status_code=404, detail="No active learning request")
return IdTokenLearnResponse(
user_id=chargepoint.learn_user_id,
until=chargepoint.learn_until
)
@router.delete(path="/{chargepoint_id}/token-learning", response_model=[])
async def get_id_token_learn_request(
chargepoint_id: UUID,
api_key: str = Security(get_api_key),
db: Session = Depends(get_db)
):
chargepoint = db.get(DbChargePoint, chargepoint_id)
if chargepoint is None:
raise HTTPException(status_code=404, detail="Chargepoint not found")
if chargepoint.learn_user_id == None:
raise HTTPException(status_code=404, detail="No active learning request")
chargepoint.learn_user_id = None
chargepoint.learn_until = None
db.commit()
return []

View file

@ -0,0 +1,22 @@
from fastapi import APIRouter, Depends
from fastapi.params import Security
from sqlalchemy.orm import Session
from app.security import get_api_key
from app.database import get_db
from app.schemas.meter_value import MeterValue
from app.models.meter_value import MeterValue as DbMeterValue
router = APIRouter(
prefix="/meter-values",
tags=["MeterValue (v1)"]
)
@router.get(path="", response_model=list[MeterValue])
async def get_meter_values(
skip: int = 0,
limit: int = 20,
api_key: str = Security(get_api_key),
db: Session = Depends(get_db)
):
return db.query(DbMeterValue).offset(skip).limit(limit).all()

View file

@ -7,19 +7,19 @@ from app.util.websocket_wrapper import WebSocketWrapper
router = APIRouter()
@router.websocket("/{charging_station_friendly_name}")
@router.websocket("/{chargepoint_identity}")
async def websocket_endpoint(
*,
websocket: WebSocket,
charging_station_friendly_name: str,
chargepoint_identity: str,
):
""" For every new charging station that connects, create a ChargePoint
instance and start listening for messages.
"""
if (websocket.user.friendly_name != charging_station_friendly_name):
if (websocket.user.identity != chargepoint_identity):
raise WebSocketException(code=1008, reason="Username doesn't match chargepoint identifier")
logging.info("Charging station '%s' (%s) connected", charging_station_friendly_name, websocket.user.id)
logging.info("Charging station '%s' (%s) connected", chargepoint_identity, websocket.user.id)
# Check protocols
try:
@ -43,5 +43,5 @@ async def websocket_endpoint(
# Accept connection and begin communication
await websocket.accept(subprotocol="ocpp2.0.1")
cp = ChargePoint(charging_station_friendly_name, WebSocketWrapper(websocket))
cp = ChargePoint(chargepoint_identity, WebSocketWrapper(websocket))
await chargepoint_manager.start(websocket.user.id, cp)

View file

@ -0,0 +1,22 @@
from fastapi import APIRouter, Depends
from fastapi.params import Security
from sqlalchemy.orm import Session
from app.security import get_api_key
from app.database import get_db
from app.schemas.transaction import Transaction
from app.models.transaction import Transaction as DbTransaction
router = APIRouter(
prefix="/transactions",
tags=["Transaction (v1)"]
)
@router.get(path="", response_model=list[Transaction])
async def get_transactions(
skip: int = 0,
limit: int = 20,
api_key: str = Security(get_api_key),
db: Session = Depends(get_db)
):
return db.query(DbTransaction).offset(skip).limit(limit).all()

View file

@ -1,4 +1,5 @@
from datetime import datetime
from decimal import Decimal
from typing import Optional
from uuid import UUID
from pydantic import BaseModel
@ -8,12 +9,14 @@ from app.schemas.connector import Connector
from ocpp.v201.enums import ResetType, ResetStatusType
class ChargePointBase(BaseModel):
friendly_name: str
identity: str
is_active: bool
price: Decimal
class ChargePointUpdate(BaseModel):
friendly_name: Optional[str] = None
identity: Optional[str] = None
is_active: Optional[bool] = None
price: Optional[Decimal]= None
class ChargePointCreate(ChargePointBase):
pass

View file

@ -1,3 +1,4 @@
from datetime import datetime
from typing import Optional
from uuid import UUID
from pydantic import BaseModel
@ -23,3 +24,12 @@ class IdToken(IdTokenBase):
class Config:
from_attributes = True
class IdTokenLearnBase(BaseModel):
user_id: UUID
class IdTokenLearnRequest(IdTokenLearnBase):
until: Optional[datetime] = None
class IdTokenLearnResponse(IdTokenLearnBase):
until: datetime

View file

@ -1,5 +1,7 @@
from datetime import datetime
from decimal import Decimal
import enum
from typing import Optional
from uuid import UUID
from pydantic import BaseModel
@ -46,9 +48,9 @@ class MeterValue(BaseModel):
id: UUID
timestamp: datetime
measurand: Measurand
phase_type: PhaseType
unit: str
value: float
phase_type: Optional[PhaseType] = None
unit: Optional[str] = None
value: Decimal
transaction_id: str
class Config:

View file

@ -1,5 +1,7 @@
from datetime import datetime
from decimal import Decimal
import enum
from typing import Optional
from uuid import UUID
from pydantic import BaseModel
@ -31,15 +33,16 @@ class TransactionEventTriggerReason(enum.Enum):
RESET_COMMAND = "ResetCommand"
class Transaction(BaseModel):
id: UUID
id: str
status: TransactionStatus
started_at: datetime
ended_at: datetime
meter_start: float
meter_end: float
end_reason: TransactionEventTriggerReason
connector_id: str
id_token_id: str
ended_at: Optional[datetime] = None
meter_start: Decimal
meter_end: Optional[Decimal] = None
end_reason: Optional[TransactionEventTriggerReason] = None
price: Decimal
user_id: Optional[UUID] = None
chargepoint_id: UUID
class Config:
from_attributes = True

View file

@ -1,7 +1,7 @@
import base64
import binascii
from starlette.authentication import (
AuthCredentials, AuthenticationBackend, AuthenticationError, SimpleUser
AuthCredentials, AuthenticationBackend, AuthenticationError
)
from app.database import SessionLocal
@ -24,7 +24,7 @@ class BasicAuthBackend(AuthenticationBackend):
username, _, password = decoded.partition(":")
with SessionLocal() as db:
chargepoint = db.query(ChargePoint).filter(ChargePoint.friendly_name == username).first()
chargepoint = db.query(ChargePoint).filter(ChargePoint.identity == username).first()
if chargepoint is None:
raise AuthenticationError('Invalid basic auth credentials')
if chargepoint.password != password: