JWT Authentication Made Easy with FastAPI
Install FastAPI and other required dependancies —
#Ensure you have Python installed on your system.
pip install fastapi uvicorn python-jose databases[postgresql]
pip install fastapi uvicorn sqlalchemy alembic psycopg2-binary passlib python-multipart
#Have a PostgreSQL database
FastAPI User Guide Link — https://fastapi.tiangolo.com/tutorial/
Implementing FastAPI Authentication —
This authentication method involves exchanging a username and password for a token, which is then used to authenticate subsequent requests. Upon login, users provide their credentials, and if valid, receive a token containing user information. This token is stored securely on the client-side and sent with each request to the server. The server validates the token, ensuring its integrity, expiration, and potentially revokes it if compromised.
Defining schemas for
#schemas.py
#Create a User model to store user information in the database
from pydantic import BaseModel, EmailStr
from typing import Optional, List, Dict
from datetime import datetime, date
class GetUser(BaseModel):
email: EmailStr
username: Optional[str]
role: int
class Config:
orm_mode = True
use_enum_values = True
class LoginUser(BaseModel):
email: EmailStr
password: str
class Config:
orm_mode = True
use_enum_values = True
class PostUser(BaseModel):
email: EmailStr
username: Optional[str]
password: str
class Config:
orm_mode = True
use_enum_values = True
Defining secret keys and algorithms that we will be using for Token creation -secret_key
and a refresh_secret_key
for JWT authentication, along with specifying the HS256
algorithm. These keys are crucial for securing the JWT tokens used for authentication and authorization in the application. The secret_key
is used to sign the JWT tokens, ensuring their integrity and authenticity, while the refresh_secret_key
is used for refreshing expired tokens. The HS256
algorithm is a widely used hashing algorithm that provides a good balance between security and performance for JWT token verification.
#add this in.env
secret_key="5b4bb4e6fsddse7862a28986e67b7087f0a61385f28e32ce9284295a3ce2781afc97"
refresh_secret_key = "5b4bb4e6fe7862a28986e6dfdfd7b7087f0a61385f28e32ce9284295a3ce2781afc97-refresh"
algorithm="HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_MINUTES = 30*60
Creating .env file for storing configuration settings and sensitive information, such as API keys, database URLs, and other environment-specific variables.
# . env file
secret_key="5b4bb4e6fe7862a28986e67b7087f0a61385f28e32ce9284295a3ce2781afc97"
refresh_secret_key = "5b4bb4e6fe7862a28986e67b7087f0a61385f28e32ce9284295a3ce2781afc97-refresh"
algorithm="HS256"
adminapikey = "your secret key if needed"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_MINUTES = 30*60
timeout=60
# # local - Database
db_host="localhost"
db_port=5432
db_name="dbname"
db_pwd="postgres"
db_usr="postgres"
SERVER = ""
Handling the .env file with config.py -
The Config
inner class specifies that the settings should be loaded from an .env
file located in the same directory as the current Python file (Path(__file__).resolve().parent /
Then, when you create an instance of the Settings
class (setting = Settings()
), Pydantic will load the settings from the .env
file and populate the class attributes with the values from the file.
#config.py
from pydantic_settings import BaseSettings
from pathlib import Path
class Settings(BaseSettings):
# database related
db_host: str
db_port: int
db_name: str
db_pwd: str
db_usr: str
port: str
# JWT Token Related
secret_key: str
refresh_secret_key : str
algorithm: str
timeout: int
ACCESS_TOKEN_EXPIRE_MINUTES : int
REFRESH_TOKEN_EXPIRE_MINUTES : int
# internal env
adminapikey: str
SERVER: str
class Config:
env_file = Path(Path(__file__).resolve().parent) / ".env"
print(f'environment created - {Path(Path(__file__).resolve().name)}')
setting = Settings()
In the below code snippet, we import CryptContext
from passlib.context
to handle password hashing and verification. We initialize pwd_context
with the bcrypt
hashing scheme and set deprecated="auto"
to handle deprecated hashing schemes automatically.
The secure_pwd
function takes a raw password as input, hashes it using the pwd_context
, and returns the hashed password.
The verify_pwd
function takes a plain password and its corresponding hash, and uses pwd_context
to verify if the plain password matches the hash. It returns True
if the password matches, and False
otherwise.
Overall, these functions provide a simple and secure way to hash and verify passwords in a FastAPI application.
#utils/password.py
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def secure_pwd(raw_password):
hashed = pwd_context.hash(raw_password)
return hashed
def verify_pwd(plain, hash):
return pwd_context.verify(plain, hash)
Create Database to store users and tokens if require
User
: Represents a user with columns forid
,username
,full_name
,email
,hashed_password
, andis_active
.Token
: Represents a token with columns forid
,token
, anduser_id
.
#models.py
from sqlalchemy import Column, Integer, String, DateTime, Boolean
from sqlalchemy.ext.declarative import declarative_base
from passlib.context import CryptContext
Base = declarative_base()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
#name = Column(String)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
role = Column(String, default="user")
class Token(Base):
__tablename__ = "tokens"
id = Column(Integer, primary_key=True, index=True)
token = Column(String, index=True)
user_id = Column(Integer)
Creating db.py -
For configuring the SQLAlchemy database engine and a session maker for interacting with the database.
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from fastapi import Depends
from pydantic import PostgresDsn
#add you database according to username,port,pwd,hostname and db
database_url = PostgresDsn.build(
scheme="postgresql",
user=setting.db_usr,
password=setting.db_pwd,
host=setting.db_host,
path=f"/{setting.db_name}",
)
SQLALCHEMY_DATABASE_URL = str(database_url)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
Creating utility functions — CRUD
get_user
: Retrieves a user from the database based on the username.create_user
: Creates a new user in the database.get_token
: Retrieves a token from the database based on the token string.create_token
: Creates a new token in the database.
#crud.py or #in utils/auth.py
from sqlalchemy.orm import Session
from . import models, db
from pydantic import EmailStr
def get_user(db: Session, email: EmailStr):
return db.query(models.User).filter(models.User.email == email).first()
def create_user(db: Session, user: UserCreate):
db_user = models.User(username=user.email, username=user.username, hashed_password=user.hashed_password)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def get_token(db: Session, token: str):
return db.query(models.Token).filter(models.Token.token == token).first()
def create_token(db: Session, token: str, user_id: int):
db_token = models.Token(token=token, user_id=user_id)
db.add(db_token)
db.commit()
db.refresh(db_token)
return db_token
Auth methods to create access token, refresh token, verify and decode the token —
#utils/auth.py or crud.py
from sqlalchemy.orm import Session
from utils.password import secure_pwd
from models import *
from schemas import *
from fastapi import HTTPException, status, Request
from config import setting
from datetime import date, datetime, timedelta, time
from typing import Union, Any, Optional
import jwt
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError
from config import setting
def create_access_token(subject: Union[str, Any], expires_delta: int = None) -> str:
if expires_delta is not None:
expires_delta = datetime.utcnow() + expires_delta
else:
expires_delta = datetime.utcnow() + timedelta(minutes=setting.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode = {"exp": expires_delta, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, setting.secret_key, setting.algorithm)
return encoded_jwt
def create_refresh_token(subject: Union[str, Any], expires_delta: int = None) -> str:
if expires_delta is not None:
expires_delta = datetime.utcnow() + expires_delta
else:
expires_delta = datetime.utcnow() + timedelta(minutes=setting.REFRESH_TOKEN_EXPIRE_MINUTES)
to_encode = {"exp": expires_delta, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, setting.refresh_secret_key, setting.algorithm)
return encoded_jwt
def decodeJWT(jwtoken: str):
try:
payload = jwt.decode(jwtoken,setting.secret_key, setting.algorithm)
return payload
except InvalidTokenError:
return None
class JWTBearer(HTTPBearer):
def __init__(self, auto_error: bool = True):
super(JWTBearer, self).__init__(auto_error=auto_error)
async def __call__(self, request: Request) -> Optional[str]:
credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(status_code=403, detail="Invalid authentication scheme.")
token = credentials.credentials
if not self.verify_jwt(token):
raise HTTPException(status_code=403, detail="Invalid token or expired token.")
return token
else:
raise HTTPException(status_code=403, detail="Invalid authorization code.")
def verify_jwt(self, jwtoken: str) -> bool:
try:
payload = decodeJWT(jwtoken)
return True
except jwt.ExpiredSignatureError:
return False
except jwt.JWTError:
return False
#jwt_bearer = JWTBearer()
This code snippet provides functions and a class for JWT (JSON Web Token) authentication in a FastAPI project. Let’s break down each part:
create_access_token
andcreate_refresh_token
functions: These functions generate JWTs (access and refresh tokens) for a given subject (user identifier). They calculate the expiration time based on the providedexpires_delta
or default to a predefined expiration time (ACCESS_TOKEN_EXPIRE_MINUTES
orREFRESH_TOKEN_EXPIRE_MINUTES
). The tokens are encoded using thejwt.encode
function from thejwt
library.decodeJWT
function: This function decodes a JWT to extract the payload. It uses thejwt.decode
function from thejwt
library, passing the JWT, the secret key, and the algorithm to verify and decode the token. If the token is invalid or expired, it returnsNone
.JWTBearer
class: This class is a custom authentication class that inherits fromHTTPBearer
, a class provided by FastAPI for handling bearer token authentication. It overrides the__call__
method to verify the JWT provided in the request'sAuthorization
header. If the token is valid, it returns the token; otherwise, it raises anHTTPException
with an appropriate error message.
4. The verify_jwt
method in the JWTBearer
class uses the decodeJWT
function to verify the token. If the token is expired or invalid, it catches specific exceptions (jwt.ExpiredSignatureError
and jwt.JWTError
) and returns False
, indicating the token is not valid.
Overall, these components work together to provide JWT authentication functionality in a FastAPI project, allowing users to generate tokens, decode tokens, and authenticate requests using JWTs.
# For Login and Registration of the user we use the above defined functions/methods below -
#routers/auth.py or authentication.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from db import *
from utils.auth or crud import get_user, create_user, create_access_token, create_refresh_token
from schemas import GetUser, PostUser, LoginUser
from datetime import date, datetime, timedelta, time
#from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
route = APIRouter(prefix="/auth", tags=["Authentication"])
oauth2bearer = OAuth2PasswordBearer(tokenUrl = 'auth/login')
#Register new user using email, username, password
@route.post("/register", response_model=GetUser)
def register_user(payload: PostUser, db: Session = Depends(get_db)):
if not payload.email:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Please add Email",
)
user = get_user(db, payload.email)
if user:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"User with email {payload.email} already exists",
)
user = create_user(db, payload)
print(user)
return user
@route.post("/login")
def login_user(payload: LoginUser, db: Session = Depends(get_db)):
"""
Login user based on email and password
"""
if not payload.email:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Please add Phone number",
)
user = get_user(db, payload.email)
token = create_access_token(user.id, timedelta(minutes=30))
refresh = create_refresh_token(user.id,timedelta(minutes = 1008))
return {'access_token': token, 'token_type': 'bearer','refresh_token':refresh,"user_id":user.id}
This code defines two API routes for user authentication using FastAPI:
- Register User (
/auth/register
):
- This route expects a
POST
request with a JSON payload containing the user's email, name, and password (PostUser
schema). - It checks if the email is provided and then checks if a user with that email already exists in the database.
- If the user does not exist, it creates a new user in the database using the
create_user
function and returns the user details (GetUser
schema).
2 . Login User (/auth/login
):
- This route expects a
POST
request with a JSON payload containing the user's email and password (LoginUser
schema). - It checks if the email is provided and retrieves the user from the database using the
get_user_by_email
function. - If the user exists, it generates an access token and a refresh token using the
create_access_token
andcreate_refresh_token
functions, respectively. - It returns a JSON response containing the access token, token type, refresh token, and user ID.
Additionally, the code includes import statements for necessary modules (APIRouter
, Depends
, HTTPException
, status
, Session
, etc.) and defines an OAuth2PasswordBearer
instance for authentication. These routes provide basic functionality for user registration and login, including token generation for authentication.
If you want PasswordBearer Authentication then —
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from crud or utils.auth import *
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login")
def get_user_by_id(user_id: int, db: Session) -> User:
"""
Get a user by ID
"""
return db.query(User).filter(User.id == user_id).first()
def get_current_user(token: str = Depends(JWTBearer())) -> User:
"""
Get current user from JWT token
"""
payload = decodeJWT(token)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token or expired token",
)
user_id = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token or expired token",
)
# Assuming you have a function to get user by id from the database
user = get_user_by_id(user_id) # Implement this function
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
return user
@route.post("/login")
def login_user(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
"""
Login user based on email and password
"""
user = get_user(db, form_data.username)
if not user or not user.check_password(form_data.password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
)
token = create_access_token(user.id, timedelta(minutes=30))
refresh = create_refresh_token(user.id, timedelta(minutes=1008))
return {'access_token': token, 'token_type': 'bearer', 'refresh_token': refresh, "user_id": user.id}
@route.get("/users/me", response_model=GetUser)
def read_users_me(current_user: User = Depends(get_current_user)):
"""
Get current user details
"""
return current_user
File Structure for your reference -
project/
│
│ ── __init__.py
│ ── main.py
│ ── db.py # Database configuration and session handling
│ ── models.py # SQLAlchemy models or create RAW queries for interaction with database
│ ── schemas.py # Pydantic schemas
│ ── utils/
│ ├── password.py
│ └── auth.py. # JWT authentication functions
│ ── routes/
│ └── auth.py # Authentication routes
├── .env # Environment variables├── requirements.txt # Python dependencies
├── config.py. # .env linking
├── requirements.txt. # Python dependencies
└── optional — run.py. # Script to run the FastAPI application