from uuid import UUID from ocpp.v201.call import UpdateFirmware from ocpp.v201.call_result import UpdateFirmware as UpdateFirmwareResult from ocpp.v201.datatypes import FirmwareType from app.database import SessionLocal from app.models.chargepoint import ChargePoint from app.models.firmware_update import FirmwareUpdate from app.ocpp_proto import chargepoint_manager from app.schemas.firmware_update import FirmwareUpdateCreate, FirmwareUpdateStatus async def create_firmware_update(chargepoint_id: UUID, firmware_update: FirmwareUpdateCreate) -> FirmwareUpdate: with SessionLocal() as db: db_chargepoint = db.get(ChargePoint, chargepoint_id) latest_firmware_update = db.query(FirmwareUpdate).filter(FirmwareUpdate.chargepoint_id == db_chargepoint.id).order_by(FirmwareUpdate.request_id.desc()).first() new_request_id = latest_firmware_update.request_id + 1 if latest_firmware_update else 1 db_firmware_update = FirmwareUpdate( request_id=new_request_id, status=FirmwareUpdateStatus.CREATED, retries=firmware_update.retries, retry_interval=firmware_update.retry_interval, location=firmware_update.location, retrieve_date_time=firmware_update.retrieve_date_time, install_date_time=firmware_update.install_date_time, chargepoint_id=db_chargepoint.id, signing_certificate=firmware_update.signing_certificate, signature=firmware_update.signature ) db.add(db_firmware_update) db.commit() db.refresh(db_firmware_update) return db_firmware_update async def submit_firmware_update(firmware_update_id: UUID) -> tuple[FirmwareUpdate, str]: with SessionLocal() as db: db_firmware_update = db.get(FirmwareUpdate, firmware_update_id) try: result: UpdateFirmwareResult = await chargepoint_manager.call( db_firmware_update.chargepoint_id, payload=UpdateFirmware( request_id=db_firmware_update.request_id, retries=db_firmware_update.retries, retry_interval=db_firmware_update.retry_interval, firmware=FirmwareType( location=db_firmware_update.location, retrieve_date_time=db_firmware_update.retrieve_date_time.isoformat(), install_date_time=db_firmware_update.install_date_time.isoformat(), signing_certificate=db_firmware_update.signing_certificate, signature=db_firmware_update.signature ) )) if result.status == "Accepted" or result.status == "AcceptedCanceled": db_firmware_update.status = FirmwareUpdateStatus.SUBMITTED db.commit() return db_firmware_update, result.status except TimeoutError as e: raise e async def update_firmware_status(chargepoint_identity: str, request_id: int, status: FirmwareUpdateStatus): with SessionLocal() as db: db_chargepoint = db.query(ChargePoint).filter(ChargePoint.identity == chargepoint_identity).first() db_firmware_update = db.query(FirmwareUpdate).filter(FirmwareUpdate.chargepoint_id == db_chargepoint.id).filter(FirmwareUpdate.request_id == request_id).first() db_firmware_update.status = FirmwareUpdateStatus(status) db.commit()