simple-ocpp-cs/app/util/websocket_auth_backend.py

33 lines
1.3 KiB
Python
Raw Permalink Normal View History

2024-03-28 21:23:25 +01:00
import base64
import binascii
from starlette.authentication import (
2024-04-19 00:08:29 +02:00
AuthCredentials, AuthenticationBackend, AuthenticationError
2024-03-28 21:23:25 +01:00
)
2024-04-13 22:43:03 +02:00
from app.database import SessionLocal
from app.models.chargepoint import ChargePoint
2024-03-28 21:23:25 +01:00
class BasicAuthBackend(AuthenticationBackend):
async def authenticate(self, conn):
if "Authorization" not in conn.headers:
raise AuthenticationError('No Authorization header provided')
auth = conn.headers["Authorization"]
try:
scheme, credentials = auth.split()
if scheme.lower() != 'basic':
raise AuthenticationError('Invalid authorization scheme')
decoded = base64.b64decode(credentials).decode("ascii")
except (ValueError, UnicodeDecodeError, binascii.Error) as exc:
raise AuthenticationError('Invalid basic auth credentials')
username, _, password = decoded.partition(":")
2024-04-13 22:43:03 +02:00
with SessionLocal() as db:
2024-04-19 00:08:29 +02:00
chargepoint = db.query(ChargePoint).filter(ChargePoint.identity == username).first()
2024-04-13 22:43:03 +02:00
if chargepoint is None:
raise AuthenticationError('Invalid basic auth credentials')
if chargepoint.password != password:
raise AuthenticationError('Invalid basic auth credentials')
return AuthCredentials(["authenticated"]), chargepoint