import base64 import binascii from starlette.authentication import ( AuthCredentials, AuthenticationBackend, AuthenticationError, SimpleUser ) from app.database import SessionLocal from app.models.chargepoint import ChargePoint class BasicAuthBackend(AuthenticationBackend): async def authenticate(self, conn): if "Authorization" not in conn.headers: raise AuthenticationError('No Authorization header provided') auth = conn.headers["Authorization"] try: scheme, credentials = auth.split() if scheme.lower() != 'basic': raise AuthenticationError('Invalid authorization scheme') decoded = base64.b64decode(credentials).decode("ascii") except (ValueError, UnicodeDecodeError, binascii.Error) as exc: raise AuthenticationError('Invalid basic auth credentials') username, _, password = decoded.partition(":") with SessionLocal() as db: chargepoint = db.query(ChargePoint).filter(ChargePoint.friendly_name == username).first() if chargepoint is None: raise AuthenticationError('Invalid basic auth credentials') if chargepoint.password != password: raise AuthenticationError('Invalid basic auth credentials') return AuthCredentials(["authenticated"]), chargepoint