Add firmware update logic
All checks were successful
ci/woodpecker/push/docker Pipeline was successful

This commit is contained in:
Oliver Traber 2025-03-21 16:40:49 +00:00 committed by BluemediaDev
parent 486977f828
commit 25c6556f42
Signed by: Bluemedia
GPG key ID: C0674B105057136C
8 changed files with 250 additions and 2 deletions

View file

@ -28,10 +28,13 @@ from app.schemas.chargepoint_variable import (
MutabilityType,
SetVariableStatusType
)
from app.schemas.firmware_update import FirmwareUpdate, FirmwareUpdateCreate, FirmwareUpdateSubmissionResponse
from app.models.chargepoint import ChargePoint as DbChargePoint
from app.models.user import User as DbUser
from app.models.chargepoint_variable import ChargepointVariable as DbChargepointVariable
from app.models.firmware_update import FirmwareUpdate as DbFirmwareUpdate
from app.security.jwt_bearer import JWTBearer
from app.services import firmware_service
router = APIRouter(
prefix="/chargepoints",
@ -293,3 +296,67 @@ async def update_chargepoint_variable(
return ChargepointVariableResponse(status=status)
except TimeoutError:
raise HTTPException(status_code=503, detail="Chargepoint didn't respond in time.")
@router.get(path="/{chargepoint_id}/firmware-updates", response_model=list[FirmwareUpdate])
async def get_firmware_updates(
chargepoint_id: UUID,
db: Session = Depends(get_db),
token: AccessToken = Depends(JWTBearer(required_roles=["administrator"])),
):
chargepoint = db.get(DbChargePoint, chargepoint_id)
if chargepoint is None:
raise HTTPException(status_code=404, detail="Chargepoint not found")
firmware_updates = db.query(DbFirmwareUpdate).filter(
DbFirmwareUpdate.chargepoint_id == chargepoint_id
).all()
return firmware_updates
@router.get(path="/{chargepoint_id}/firmware-updates/{firmware_update_id}", response_model=FirmwareUpdate)
async def get_firmware_update(
chargepoint_id: UUID,
firmware_update_id: UUID,
db: Session = Depends(get_db),
token: AccessToken = Depends(JWTBearer(required_roles=["administrator"])),
):
chargepoint = db.get(DbChargePoint, chargepoint_id)
if chargepoint is None:
raise HTTPException(status_code=404, detail="Chargepoint not found")
firmware_update = db.query(DbFirmwareUpdate).filter(
DbFirmwareUpdate.chargepoint_id == chargepoint_id,
DbFirmwareUpdate.id == firmware_update_id
).first()
if firmware_update is None:
raise HTTPException(status_code=404, detail="FirmwareUpdate not found")
return firmware_update
@router.post(path="/{chargepoint_id}/firmware-updates", response_model=FirmwareUpdate)
async def create_firmware_update(
chargepoint_id: UUID,
firmware_update: FirmwareUpdateCreate,
db: Session = Depends(get_db),
token: AccessToken = Depends(JWTBearer(required_roles=["administrator"])),
):
chargepoint = db.get(DbChargePoint, chargepoint_id)
if chargepoint is None:
raise HTTPException(status_code=404, detail="Chargepoint not found")
firmware_update = await firmware_service.create_firmware_update(chargepoint_id, firmware_update)
return firmware_update
@router.post(path="/{chargepoint_id}/firmware-updates/{firmware_update_id}/submit", response_model=ChargePointResetResponse)
async def submit_firmware_update(
chargepoint_id: UUID,
firmware_update_id: UUID,
token: AccessToken = Depends(JWTBearer(required_roles=["administrator"])),
):
if chargepoint_manager.is_connected(chargepoint_id) == False:
raise HTTPException(status_code=503, detail="Chargepoint not connected.")
try:
firmware_update, status = await firmware_service.submit_firmware_update(firmware_update_id)
return FirmwareUpdateSubmissionResponse(firmware_update, status)
except TimeoutError:
raise HTTPException(status_code=503, detail="Chargepoint didn't respond in time.")