JWT Authentication Made Easy with FastAPI

Hritik Majgaonkar
8 min readFeb 29, 2024

--

Install FastAPI and other required dependancies —

#Ensure you have Python installed on your system.
pip install fastapi uvicorn python-jose databases[postgresql]
pip install fastapi uvicorn sqlalchemy alembic psycopg2-binary passlib python-multipart
#Have a PostgreSQL database

FastAPI User Guide Linkhttps://fastapi.tiangolo.com/tutorial/

Implementing FastAPI Authentication —

This authentication method involves exchanging a username and password for a token, which is then used to authenticate subsequent requests. Upon login, users provide their credentials, and if valid, receive a token containing user information. This token is stored securely on the client-side and sent with each request to the server. The server validates the token, ensuring its integrity, expiration, and potentially revokes it if compromised.

Defining schemas for

#schemas.py 

#Create a User model to store user information in the database

from pydantic import BaseModel, EmailStr
from typing import Optional, List, Dict
from datetime import datetime, date


class GetUser(BaseModel):
email: EmailStr
username: Optional[str]
role: int

class Config:
orm_mode = True
use_enum_values = True


class LoginUser(BaseModel):
email: EmailStr
password: str

class Config:
orm_mode = True
use_enum_values = True


class PostUser(BaseModel):
email: EmailStr
username: Optional[str]
password: str

class Config:
orm_mode = True
use_enum_values = True

Defining secret keys and algorithms that we will be using for Token creation -
secret_key and a refresh_secret_key for JWT authentication, along with specifying the HS256 algorithm. These keys are crucial for securing the JWT tokens used for authentication and authorization in the application. The secret_key is used to sign the JWT tokens, ensuring their integrity and authenticity, while the refresh_secret_key is used for refreshing expired tokens. The HS256 algorithm is a widely used hashing algorithm that provides a good balance between security and performance for JWT token verification.

#add this in.env

secret_key="5b4bb4e6fsddse7862a28986e67b7087f0a61385f28e32ce9284295a3ce2781afc97"
refresh_secret_key = "5b4bb4e6fe7862a28986e6dfdfd7b7087f0a61385f28e32ce9284295a3ce2781afc97-refresh"
algorithm="HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_MINUTES = 30*60

Creating .env file for storing configuration settings and sensitive information, such as API keys, database URLs, and other environment-specific variables.

# . env file
secret_key="5b4bb4e6fe7862a28986e67b7087f0a61385f28e32ce9284295a3ce2781afc97"
refresh_secret_key = "5b4bb4e6fe7862a28986e67b7087f0a61385f28e32ce9284295a3ce2781afc97-refresh"
algorithm="HS256"
adminapikey = "your secret key if needed"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_MINUTES = 30*60
timeout=60


# # local - Database
db_host="localhost"
db_port=5432
db_name="dbname"
db_pwd="postgres"
db_usr="postgres"

SERVER = ""

Handling the .env file with config.py -

The Config inner class specifies that the settings should be loaded from an .env file located in the same directory as the current Python file (Path(__file__).resolve().parent /

Then, when you create an instance of the Settings class (setting = Settings()), Pydantic will load the settings from the .env file and populate the class attributes with the values from the file.

#config.py
from pydantic_settings import BaseSettings
from pathlib import Path


class Settings(BaseSettings):
# database related
db_host: str
db_port: int
db_name: str
db_pwd: str
db_usr: str
port: str

# JWT Token Related
secret_key: str
refresh_secret_key : str
algorithm: str
timeout: int
ACCESS_TOKEN_EXPIRE_MINUTES : int
REFRESH_TOKEN_EXPIRE_MINUTES : int

# internal env
adminapikey: str

SERVER: str

class Config:
env_file = Path(Path(__file__).resolve().parent) / ".env"
print(f'environment created - {Path(Path(__file__).resolve().name)}')


setting = Settings()

In the below code snippet, we import CryptContext from passlib.context to handle password hashing and verification. We initialize pwd_context with the bcrypt hashing scheme and set deprecated="auto" to handle deprecated hashing schemes automatically.

The secure_pwd function takes a raw password as input, hashes it using the pwd_context, and returns the hashed password.

The verify_pwd function takes a plain password and its corresponding hash, and uses pwd_context to verify if the plain password matches the hash. It returns True if the password matches, and False otherwise.

Overall, these functions provide a simple and secure way to hash and verify passwords in a FastAPI application.

#utils/password.py

from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

def secure_pwd(raw_password):
hashed = pwd_context.hash(raw_password)

return hashed
def verify_pwd(plain, hash):
return pwd_context.verify(plain, hash)

Create Database to store users and tokens if require

  • User: Represents a user with columns for id, username, full_name, email, hashed_password, and is_active.
  • Token: Represents a token with columns for id, token, and user_id.
#models.py

from sqlalchemy import Column, Integer, String, DateTime, Boolean
from sqlalchemy.ext.declarative import declarative_base
from passlib.context import CryptContext

Base = declarative_base()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

class User(Base):
__tablename__ = "users"

id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
#name = Column(String)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
role = Column(String, default="user")

class Token(Base):
__tablename__ = "tokens"

id = Column(Integer, primary_key=True, index=True)
token = Column(String, index=True)
user_id = Column(Integer)

Creating db.py -

For configuring the SQLAlchemy database engine and a session maker for interacting with the database.

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from fastapi import Depends
from pydantic import PostgresDsn
#add you database according to username,port,pwd,hostname and db

database_url = PostgresDsn.build(
scheme="postgresql",
user=setting.db_usr,
password=setting.db_pwd,
host=setting.db_host,
path=f"/{setting.db_name}",
)

SQLALCHEMY_DATABASE_URL = str(database_url)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

Creating utility functions — CRUD

  • get_user: Retrieves a user from the database based on the username.
  • create_user: Creates a new user in the database.
  • get_token: Retrieves a token from the database based on the token string.
  • create_token: Creates a new token in the database.
#crud.py or #in utils/auth.py
from sqlalchemy.orm import Session
from . import models, db
from pydantic import EmailStr

def get_user(db: Session, email: EmailStr):
return db.query(models.User).filter(models.User.email == email).first()

def create_user(db: Session, user: UserCreate):
db_user = models.User(username=user.email, username=user.username, hashed_password=user.hashed_password)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user

def get_token(db: Session, token: str):
return db.query(models.Token).filter(models.Token.token == token).first()

def create_token(db: Session, token: str, user_id: int):
db_token = models.Token(token=token, user_id=user_id)
db.add(db_token)
db.commit()
db.refresh(db_token)
return db_token

Auth methods to create access token, refresh token, verify and decode the token

#utils/auth.py or crud.py
from sqlalchemy.orm import Session
from utils.password import secure_pwd
from models import *
from schemas import *
from fastapi import HTTPException, status, Request
from config import setting
from datetime import date, datetime, timedelta, time
from typing import Union, Any, Optional
import jwt
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError
from config import setting

def create_access_token(subject: Union[str, Any], expires_delta: int = None) -> str:
if expires_delta is not None:
expires_delta = datetime.utcnow() + expires_delta
else:
expires_delta = datetime.utcnow() + timedelta(minutes=setting.ACCESS_TOKEN_EXPIRE_MINUTES)

to_encode = {"exp": expires_delta, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, setting.secret_key, setting.algorithm)
return encoded_jwt

def create_refresh_token(subject: Union[str, Any], expires_delta: int = None) -> str:
if expires_delta is not None:
expires_delta = datetime.utcnow() + expires_delta
else:
expires_delta = datetime.utcnow() + timedelta(minutes=setting.REFRESH_TOKEN_EXPIRE_MINUTES)

to_encode = {"exp": expires_delta, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, setting.refresh_secret_key, setting.algorithm)
return encoded_jwt

def decodeJWT(jwtoken: str):
try:
payload = jwt.decode(jwtoken,setting.secret_key, setting.algorithm)
return payload
except InvalidTokenError:
return None


class JWTBearer(HTTPBearer):
def __init__(self, auto_error: bool = True):
super(JWTBearer, self).__init__(auto_error=auto_error)

async def __call__(self, request: Request) -> Optional[str]:
credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(status_code=403, detail="Invalid authentication scheme.")
token = credentials.credentials
if not self.verify_jwt(token):
raise HTTPException(status_code=403, detail="Invalid token or expired token.")
return token
else:
raise HTTPException(status_code=403, detail="Invalid authorization code.")

def verify_jwt(self, jwtoken: str) -> bool:
try:
payload = decodeJWT(jwtoken)
return True
except jwt.ExpiredSignatureError:
return False
except jwt.JWTError:
return False

#jwt_bearer = JWTBearer()

This code snippet provides functions and a class for JWT (JSON Web Token) authentication in a FastAPI project. Let’s break down each part:

  1. create_access_token and create_refresh_token functions: These functions generate JWTs (access and refresh tokens) for a given subject (user identifier). They calculate the expiration time based on the provided expires_delta or default to a predefined expiration time (ACCESS_TOKEN_EXPIRE_MINUTES or REFRESH_TOKEN_EXPIRE_MINUTES). The tokens are encoded using the jwt.encode function from the jwt library.
  2. decodeJWT function: This function decodes a JWT to extract the payload. It uses the jwt.decode function from the jwt library, passing the JWT, the secret key, and the algorithm to verify and decode the token. If the token is invalid or expired, it returns None.
  3. JWTBearer class: This class is a custom authentication class that inherits from HTTPBearer, a class provided by FastAPI for handling bearer token authentication. It overrides the __call__ method to verify the JWT provided in the request's Authorization header. If the token is valid, it returns the token; otherwise, it raises an HTTPException with an appropriate error message.

4. The verify_jwt method in the JWTBearer class uses the decodeJWT function to verify the token. If the token is expired or invalid, it catches specific exceptions (jwt.ExpiredSignatureError and jwt.JWTError) and returns False, indicating the token is not valid.

Overall, these components work together to provide JWT authentication functionality in a FastAPI project, allowing users to generate tokens, decode tokens, and authenticate requests using JWTs.

# For Login and Registration of the user we use the above defined functions/methods below -

#routers/auth.py or authentication.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from db import *
from utils.auth or crud import get_user, create_user, create_access_token, create_refresh_token
from schemas import GetUser, PostUser, LoginUser
from datetime import date, datetime, timedelta, time
#from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm


route = APIRouter(prefix="/auth", tags=["Authentication"])
oauth2bearer = OAuth2PasswordBearer(tokenUrl = 'auth/login')

#Register new user using email, username, password
@route.post("/register", response_model=GetUser)
def register_user(payload: PostUser, db: Session = Depends(get_db)):

if not payload.email:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Please add Email",
)
user = get_user(db, payload.email)
if user:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"User with email {payload.email} already exists",
)
user = create_user(db, payload)
print(user)

return user


@route.post("/login")
def login_user(payload: LoginUser, db: Session = Depends(get_db)):
"""
Login user based on email and password
"""
if not payload.email:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Please add Phone number",
)

user = get_user(db, payload.email)
token = create_access_token(user.id, timedelta(minutes=30))
refresh = create_refresh_token(user.id,timedelta(minutes = 1008))

return {'access_token': token, 'token_type': 'bearer','refresh_token':refresh,"user_id":user.id}

This code defines two API routes for user authentication using FastAPI:

  1. Register User (/auth/register):
  • This route expects a POST request with a JSON payload containing the user's email, name, and password (PostUser schema).
  • It checks if the email is provided and then checks if a user with that email already exists in the database.
  • If the user does not exist, it creates a new user in the database using the create_user function and returns the user details (GetUser schema).

2 . Login User (/auth/login):

  • This route expects a POST request with a JSON payload containing the user's email and password (LoginUser schema).
  • It checks if the email is provided and retrieves the user from the database using the get_user_by_email function.
  • If the user exists, it generates an access token and a refresh token using the create_access_token and create_refresh_token functions, respectively.
  • It returns a JSON response containing the access token, token type, refresh token, and user ID.

Additionally, the code includes import statements for necessary modules (APIRouter, Depends, HTTPException, status, Session, etc.) and defines an OAuth2PasswordBearer instance for authentication. These routes provide basic functionality for user registration and login, including token generation for authentication.

If you want PasswordBearer Authentication then —


from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from crud or utils.auth import *


oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login")

def get_user_by_id(user_id: int, db: Session) -> User:
"""
Get a user by ID
"""
return db.query(User).filter(User.id == user_id).first()

def get_current_user(token: str = Depends(JWTBearer())) -> User:
"""
Get current user from JWT token
"""
payload = decodeJWT(token)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token or expired token",
)
user_id = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token or expired token",
)
# Assuming you have a function to get user by id from the database
user = get_user_by_id(user_id) # Implement this function
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
return user

@route.post("/login")
def login_user(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
"""
Login user based on email and password
"""
user = get_user(db, form_data.username)
if not user or not user.check_password(form_data.password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
)

token = create_access_token(user.id, timedelta(minutes=30))
refresh = create_refresh_token(user.id, timedelta(minutes=1008))

return {'access_token': token, 'token_type': 'bearer', 'refresh_token': refresh, "user_id": user.id}

@route.get("/users/me", response_model=GetUser)
def read_users_me(current_user: User = Depends(get_current_user)):
"""
Get current user details
"""
return current_user

File Structure for your reference -

project/

│ ── __init__.py
│ ── main.py
│ ── db.py # Database configuration and session handling
│ ── models.py # SQLAlchemy models or create RAW queries for interaction with database
│ ── schemas.py # Pydantic schemas
│ ── utils/
│ ├── password.py
│ └── auth.py. # JWT authentication functions
│ ── routes/
│ └── auth.py # Authentication routes
├── .env # Environment variables├── requirements.txt # Python dependencies

├── config.py. # .env linking
├── requirements.txt. # Python dependencies
└── optional — run.py. # Script to run the FastAPI application

--

--