JWT Authentication in FastAPI: Building Secure APIs

Kevin Kim
10 min readJul 27, 2023

We live in a world where the security of the user is really important. The potential consequences of a security breach are immense. Your personal data such as your name and IDs may fall into the wrong hands, leaving users vulnerable to identity theft.

Objectives that this article will cover.

  • Hashing
  • OAuth2 with Password
  • Bearer with JWT tokens

Hashing Passwords

When creating a database that will handle users signing up, you do not want to store the password as it is why? well we will get to that shortly what you want is that when a user decides to use such a password as :

josh@1270

what you want to be saved to the database is something like this:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

Yes, gibberish. Let's talk about why you should do that.

  • Data Protection:

Hashing passwords converts the original plain-text passwords into irreversible, fixed-length hash codes. This way, even if the hashed data is somehow obtained by an attacker (e.g., through a data breach), they cannot reverse-engineer it to reveal the original passwords.

  • Preventing Password Reuse

Many users have a tendency to reuse passwords across different platforms. Hashing passwords ensures that even if users use the same password elsewhere, the hashed value will differ, reducing the impact of password reuse on security.

How do we implement hashing in your project then?

Installing the necessary dependencies.

pip install "passlib[bcrypt]"

Python package to handle password hashes. It supports many secure hashing algorithms and utilities to work with them. The recommended algorithm is “Bcrypt”.

Creating a utils.py file

The utils.py file will hold the necessary code for hashing passwords. We are creating a utility file that will hold that code since it is good practice and scaling can easily be done due to maintenance of good practice.

from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def hash_pass(password:str):
return pwd_context.hash(password)

Breakdown of the code:

  • pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto"): This creates an instance of the CryptContext class, specifying that the bcrypt hashing algorithm should be used. The deprecated="auto" parameter means that if bcrypt becomes deprecated in the future, passlib will automatically choose a more secure scheme. Cool right ??
  • def hash_pass(password: str) -> str:: This is the function definition for the hash_pass function, which takes a plain-text password as input and returns the hashed password as a string.
  • return pwd_context.hash(password): Inside the hash_pass function, pwd_context.hash(password) is called to hash the input password using bcrypt. The resulting hash is then returned.

changing how post request should be to handle hashing:

@router.post('/', status_code=status.HTTP_201_CREATED,response_model=schemas.UserOutput)
def create_users(user:schemas.CreateUser, db:Session = Depends(get_db)):

# Hash The Password
hashed_pass = hash_pass(user.password)

user.password = hashed_pass

new_user = models.User(**user.dict())
db.add(new_user)
db.commit()
db.refresh(new_user)

return new_user

Breakdown of the code:

  • @router.post('/', status_code=status.HTTP_201_CREATED, response_model=schemas.UserOutput): This is a FastAPI decorator for a POST request to the specified endpoint ('/'). It indicates that the route is used to create new users and should return a response with status code 201 (Created) upon successful creation. The response_model parameter ensures that the response data is validated against the schemas.UserOutput model before being returned.
  • def create_users(user: schemas.CreateUser, db: Session = Depends(get_db)):: This is the route function for creating users. It takes a request body (user) containing the user details based on the schemas.CreateUser model and a database session (db) obtained using the get_db dependency.
  • hashed_pass = hash_pass(user.password): The incoming user password is hashed using the hash_pass function.
  • user.password = hashed_pass: The user.password attribute is updated with the hashed password before creating the new user in the database. This ensures that the user's password is stored securely in hashed form.
  • new_user = models.User(**user.dict()): A new models.User instance is created using the user data from the user object.
  • db.commit(): The transaction is committed, persisting the new user in the database.
  • db.refresh(new_user): The database session is refreshed to ensure that it contains the latest state of the newly created user.
  • return new_user: The newly created user, represented as a schemas.UserOutput object is returned as the API response

Schemas used to perform the processes:

We will talk more about the `Token` and Datatoken when we reach JWT. What we are essentially doing is that we are telling our API is that we expect a certain type of response in accordance with what we want.

For such clear data validation,we are using a python library called pydantic provides a concise and elegant way to define and validate data models in Python applications. It also has a built in regex system for validating how your email is to be set-up. How great is that, no more writing regular expressions.

Responses:

Postman-response

Database-Response

The screenshot represents the email and password of the user which has been hashed.

JWT Authentication

A brief introduction into what JWT is all about before we get into actual implementation.

What is JWT?

JWT stands for JSON Web Token. It is a standard for representing claims securely between two parties. A JWT consists of three parts: a header, a payload, and a signature. These parts are encoded into a single string, which can be easily transmitted between systems.

It looks something like this:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

Note:

It is not encrypted, so, anyone could recover the information from the contents. But it’s signed. So, when you receive a token that you emitted, you can verify that you actually emitted it.

JWT Structure:

It consists of:

  • Header: Contains metadata about the token, such as the algorithm used for signing the token.
  • Payload: Also known as the claims, it contains the information (data) that needs to be transmitted, such as the user’s identity, permissions, and additional metadata.
  • Signature: To ensure the integrity of the token, a digital signature is generated using the header, payload, a secret key (known only to the server), and a specific algorithm defined in the header.

How it works:

When a user wants to access a protected resource they need to be authenticated. Here’s how the process works:

  • Authentication (Login): The user provides their credentials (e.g., username and password) to the server.
  • Token Generation: The server verifies the credentials and generates a JWT, which includes the user’s identity and any additional necessary information (e.g., user role, permissions).
  • Token Transmission: The JWT is returned to the client and is typically stored in a cookie, local storage, or session storage.
  • Subsequent Requests: For each subsequent request, the client includes the JWT in the request header or another secure method (e.g., Authorization header). This way, the server can identify and authenticate the user based on the JWT.
  • Token Verification: Upon receiving the JWT from the client, the server verifies its integrity and authenticity by recalculating the signature using the same secret key and algorithm. If the signature matches, the server trusts the information contained within the JWT.

Actual code implementation

Installing necessary dependencies

pip install "python-jose[cryptography]"

Generates and verifies the JWT tokens in Python.

Creating an Oauth file.

The file will handle authentication and token validation using OAuth2 token-based authentication.

The code in the file will resemble something like this: After explaining a few concepts and what each does we will delve into what that code does.

Generate a secure random secret key

It is good practice to generate your own random secret key that will be used to sign the JWT tokens.

openssl rand -hex 32

Copy the output to the variable SECRET_KEY (don't use the one in the example).

Algorithm

The variable ALGORITHM will be used to sign the JWT token and set it to "HS256".

Expiration-date

Create a variable for the expiration of the token.

A schema for the token endpoint

Remember when I talked about the token schemas, we are going to use them here.

class Token(BaseModel):
access_token: str
token_type: str


class DataToken(BaseModel):
id: Optional[str] = None

Explanation of the functions.

from datetime import timedelta, datetime

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from sqlalchemy.orm import Session

import schemas
from database import get_db

import models

oauth2_scheme = OAuth2PasswordBearer(tokenUrl='/login')

SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"expire": expire.strftime("%Y-%m-%d %H:%M:%S")})

encoded_jwt = jwt.encode(to_encode, SECRET_KEY, ALGORITHM)

return encoded_jwt

def verify_token_access(token: str, credentials_exception):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=ALGORITHM)

id: str = payload.get("user_id")

if id is None:
raise credentials_exception
token_data = schemas.DataToken(id=id)
except JWTError as e:
print(e)
raise credentials_exception

return token_data

def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not Validate Credentials",
headers={"WWW-Authenticate": "Bearer"})

token = verify_token_access(token, credentials_exception)

user = db.query(models.User).filter(models.User.id == token.id).first()

return user
  • The oauth2_scheme = OAuth2PasswordBearer(tokenUrl='/login'): This sets up the OAuth2 password bearer authentication scheme for token-based authentication. Basically what we are doing is that we are setting up an end-point that allows the user to log in.
  • The create_access_token(data: dict) is used to create an access token (JWT) by encoding the provided data with an expiration time. The token includes a “user_id” claim and is encoded using the SECRET_KEY and ALGORITHM.
  • The verify_token_access(token: str, credentials_exception): This function verifies the access token by decoding it using the SECRET_KEY and ALGORITHM. If the token is valid, it extracts the "user_id" claim and returns a data object containing the user ID. If the token is invalid or expired, it raises a credentials_exception.
  • The get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)): This function is a dependency that retrieves the currently authenticated user from the access token. It uses the verify_token_access function to validate the token and extract the user ID. It then fetches the corresponding user from the database using user-ID and returns the user object.

Updating our utils.py file to contain another function that verifies the hashed password to the original password.

def verify_password(non_hashed_pass, hashed_pass):
return pwd_context.verify(non_hashed_pass, hashed_pass)

The function only does one thing. It verifies that the non-hashed password is equal to the hashed password.

The Login function

  • routers = APIRouter(tags=['Authentication']): This creates an APIRouter instance for handling authentication-related endpoints, and it adds the 'Authentication' tag to categorize these endpoints in the API documentation.
  • @routers.post('/login', response_model=schemas.Token): This is a FastAPI decorator for a POST request to the '/login' endpoint. It indicates that this route is responsible for user login, and it will return a response with the JSON model defined schemas.Token upon successful login.
  • def login(userdetails: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):: route function for user login. It takes an OAuth2PasswordRequestForm object, which FastAPI provides for handling OAuth2 password grant type authentication. The OAuth2PasswordRequestForm contains the username which is the email in this instance and password entered by the user during login. Additionally, the function depends on the database session obtained using the get_db dependency.
  • user = db.query(models.User).filter(models.User.email == userdetails.username).first(): The function fetches the user with the specified email (username) from the database using the provided email from the login form.
  • if not user: ...: If no user is found with the provided email, the function raises an HTTPException with a status code of 401 (Unauthorized) and informs that the user does not exist.
  • if not utils.verify_password(userdetails.password, user.password): ...: If the user is found, the function calls the utils.verify_password function to check if the provided password matches the hashed password stored in the database. If the passwords do not match, the function raises an HTTPException with a status code of 401 (Unauthorized) and indicates that the passwords do not match.
  • access_token = create_access_token(data={"user_id": user.id}): If the provided credentials are valid, the function generates an access token (JWT) using the create_access_token function. It includes the "user_id" claim in the token payload.
  • return {"access_token": access_token, "token_type": "bearer"}: The function returns a dictionary containing the access token and the token type (bearer) as the response. This access token can be used for subsequent authenticated requests.

That’s about it for all the code we need to carry out authentication using JWT.

Testing that it actually works using Postman

It works !!!!

As you wrap up this insightful article, I want to share something extraordinary with you. Beyond crafting compelling content, I lead a freelance group specializing in innovative software solutions. 🌐✨

Imagine having a website that not only looks stunning but functions seamlessly, APIs that enhance your operations, and the power of data visualization and machine learning at your fingertips. That’s what we bring to the table! 💻🚀

Whether you’re a visionary entrepreneur or a growing SME, our team is here to turn your tech dreams into reality. Let’s redefine what’s possible. Connect with us on Gmail, LinkedIn, or WhatsApp, and let’s embark on a journey of digital transformation together. Your success story begins now!

Reach out to us at:

Gmail: mbs.team.general@gmail.com

LinkedIn: https://www.linkedin.com/in/nourmibrahimmbs/

Whatsapp: +20–109–486–7014

--

--

Kevin Kim

Creating awesome stuff and sharing it here! Passionate about Tech, AI and anything that includes Growth.