simple-ocpp-cs/app/services/transaction_service.py
BluemediaGER ae2fe763f1
Some checks failed
ci/woodpecker/push/docker Pipeline was successful
ci/woodpecker/cron/docker Pipeline failed
Refactor chargepoint logic
2024-07-25 21:34:31 +02:00

86 lines
No EOL
3.4 KiB
Python

from datetime import datetime
from typing import Optional
from uuid import UUID
from app.database import SessionLocal
from app.models.chargepoint import ChargePoint
from app.models.transaction import Transaction
from app.schemas.meter_value import Measurand
from app.schemas.transaction import TransactionEventTriggerReason, TransactionStatus
from app.services import meter_value_service
async def create_transaction(
chargepoint_identity: str,
user_id: UUID,
timestamp: datetime,
transaction_info,
transaction_data
):
with SessionLocal() as db:
chargepoint = db.query(ChargePoint).filter(ChargePoint.identity == chargepoint_identity).first()
meter_start=0
if "meter_value" in transaction_data.keys():
for meter_value_entry in transaction_data['meter_value']:
for sampled_value in meter_value_entry['sampled_value']:
if "measurand" in sampled_value.keys():
if sampled_value['measurand'] == str(Measurand.ENERGY_ACTIVE_IMPORT_REGISTER):
meter_start = sampled_value['value']
else:
meter_start = sampled_value['value']
transaction = Transaction(
id=transaction_info["transaction_id"],
status=TransactionStatus.ONGOING,
started_at=timestamp,
meter_start=meter_start,
price=chargepoint.price,
chargepoint_id=chargepoint.id,
user_id=user_id
)
db.add(transaction)
db.commit()
async def update_transaction(
transaction_id: str,
transaction_data
):
with SessionLocal() as db:
transaction = db.get(Transaction, transaction_id)
if transaction != None:
if transaction.status == TransactionStatus.ONGOING:
if "meter_value" in transaction_data.keys():
for meter_value_entry in transaction_data['meter_value']:
await meter_value_service.create_meter_value(
transaction_id=transaction.id,
meter_value_data=meter_value_entry
)
async def end_transaction(
transaction_id: str,
timestamp: datetime,
trigger_reason: str,
transaction_data,
user_id: Optional[UUID]
):
with SessionLocal() as db:
transaction = db.get(Transaction, transaction_id)
if transaction != None:
meter_end=0
if "meter_value" in transaction_data.keys():
for meter_value_entry in transaction_data['meter_value']:
for sampled_value in meter_value_entry['sampled_value']:
if "measurand" in sampled_value.keys():
if sampled_value['measurand'] == str(Measurand.ENERGY_ACTIVE_IMPORT_REGISTER):
meter_end = sampled_value['value']
else:
meter_end = sampled_value['value']
transaction.status = TransactionStatus.ENDED
transaction.ended_at = timestamp
transaction.end_reason = TransactionEventTriggerReason(trigger_reason)
transaction.meter_end = meter_end
if user_id != None:
transaction.user_id = user_id
db.commit()