Mastering FastAPI CRUD Operations with Async SqlAlchemy and PostgreSQL

MyFaduGame
8 min readApr 15, 2024

--

FastAPI Async Setup with Postgres and Sqlalchemy

In this comprehensive guide, we’ll delve into building robust CRUD (Create, Read, Update, Delete) operations with FastAPI, leveraging the power of Async SqlAlchemy, and integrating with PostgreSQL for high-performance asynchronous database interactions.

Lets Divide our Fruit Full project into some Steps:

  1. Introduction to FastAPI and Async SqlAlchemy: We’ll start by introducing FastAPI, a modern, high-performance Python framework for building APIs, and Async SqlAlchemy, an asynchronous version of SqlAlchemy, which enables non-blocking database operations.
  2. Setting Up the Project: We’ll guide you through setting up a FastAPI project with Async SqlAlchemy and PostgreSQL. This includes creating models, configuring the database connection, and defining asynchronous CRUD operations.
  3. Async CRUD Operations: Learn how to perform asynchronous CRUD operations using FastAPI and Async SqlAlchemy. We’ll cover creating, reading, updating, and deleting records from the database asynchronously.
  4. Handling Relationships: Explore how to handle relationships between database tables asynchronously using Async SqlAlchemy. We’ll cover one-to-one, one-to-many, and many-to-many relationships in detail.
  5. Error Handling and Validation: Discover techniques for error handling and data validation in FastAPI, ensuring the integrity and security of your API endpoints. We’ll cover input validation, error responses, and exception handling.

What is Async and What is use of it?

Async, short for asynchronous, is a programming concept used in FastAPI to achieve non-blocking, concurrent execution of tasks. Here’s a breakdown:

  1. Concurrency: Async allows FastAPI to handle multiple operations simultaneously without waiting for each operation to complete before starting the next one. This concurrency is essential for improving the responsiveness and performance of web applications, especially when dealing with I/O-bound tasks like network requests or database queries.
  2. Asynchronous Endpoints: FastAPI supports async endpoints, which are functions that can execute asynchronously. When a client sends a request to an async endpoint, FastAPI can process other requests while waiting for I/O operations to complete, resulting in better throughput and responsiveness.
  3. Improved Performance: By utilizing async programming, FastAPI can efficiently utilize system resources and handle a large number of concurrent connections without blocking the event loop. This leads to improved scalability and reduced latency in web applications.
  4. Async Testing: FastAPI also supports asynchronous testing, allowing developers to write tests for async code and ensuring the reliability of their applications. This includes testing async endpoints and calling other async functions within test cases.

Setting Up the Project.

Now enough of the explanation lets dive into the setting up our environment in order to code our first async project in FastAPI.

First open the Fresh IDE on a new folder. In this Case I am using VSCode Visual Studio Code). And setup your virtual environment. I am assuming at this point you already know what is Virtual Environment and how to use it.

Installation:

Install the FastAPI using PIP (Python Package Manager). using command “pip install fastapi[all] uvicorn” or pip install fastapi uvicorn” both will work same. Just “fastapi[all]” will install further more dependencies to your virtual environment

In my case I used pipenv as an virtual environment.

Pipenv as an Virtual Environment.

For pipenv as an Virtual Environment

python -m pipenv shell 
pip install fastapi[all] uvicorn
pip install fastapi uvicorn

Creating an Sample Request:

No we are good to code. Lets start by creating an sample GET Request to our app, the code looks as follows

from fastapi import FastAPI

app = FastAPI()

@app.get("/")
def read_root():
return {"message": "Hello, World!"}

Save the above file as main.py, as it is our main file in order to run our FastAPI application. In order to run our application we can use “uvicorn” which we installed previously while installing “fastapi” to our environment. Now to run our app just use.

uvicorn main:app --reload

The code should run and the terminal should look like this.

Running our basic FastAPI system

Now our basic project is setted up. Now we can move forward in order to setup PostgreSQL connection and aync session for query the database.

Database Connection with Async Session.

Let connect database i.e. PostgreSQL to our app. In order to do so follow the steps given below. In this step I created a new folder as an config to store all the configurations like database configs and other configs if needed. I particularly follow this type or architecture feel free to customize according to your need.

#FastAPI Imports
from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import declarative_base, sessionmaker
from dotenv import load_dotenv

Base = declarative_base()
load_dotenv()

DB_USER= "your database user"
DB_PASSWORD= "your database password"
DB_HOST= "your database host"
DB_PORT= "your database port"
DB_NAME= "your database name"

class AsyncDatabaseSession:
def __init__(self):
self._session = None
self._engine = None
def __getattr__(self, name):
return getattr(self._session, name)
def init(self):
self._engine = create_async_engine(
f"postgresql+asyncpg://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}",
future=True,
echo=True,
)
self._session = sessionmaker(
self._engine, expire_on_commit=False, class_=AsyncSession
)()
async def create_all(self):
self._engine.begin

db=AsyncDatabaseSession()

database = create_engine(Config.SYNC_DB_CONFIG,echo=True)

Let’s break down for better expaination. On the top hand we imported some basic things i.e. some FastAPI imports which we are using in order to create an database connection as well as an async engine.

Now on the next end I added an dotenv thing. We can’t add our database credentials directly to the app right. So we have to implement the safe credential management. So I choose .env and read that .env from load_dotenv config.

In order to download the dotenv library you can install using “pip install python-dotenv”

pip install python-dotenv

Now I created some variables which will use our .env credentials into that variable like database user, database password, database host etc.

This code snippet defines a class named `AsyncDatabaseSession`, which is designed to manage an asynchronous database session using SQLAlchemy in a FastAPI application. Let’s break down the code:

1. Class Definition: The `AsyncDatabaseSession` class is defined with an initializer `__init__` method and two other methods: `__getattr__` and `init`. The `__getattr__` method allows access to attributes of the `_session` object, and the `init` method initializes the database session and engine.

2. Attributes:
. `_session`: This attribute holds the database session object.
. `_engine`: This attribute holds the database engine object.

3. `__getattr__` Method: This method is a special method that gets called when an attribute lookup fails. It allows access to attributes of the `_session` object, delegating attribute access to it.

4. `init` Method:

  • Inside the `init` method, an asynchronous engine is created using `create_async_engine` from SQLAlchemy. The engine is configured to use the asyncpg driver for PostgreSQL.
  • The `sessionmaker` function creates a new session class bound to this engine. The `class_` parameter specifies that it’s an async session (presumably `AsyncSession` is a custom class).
  • Finally, the session is instantiated and stored in the `_session` attribute.

5. `create_all` Method: This method is intended to create all tables in the database. However, it currently lacks functionality as it only invokes `self._engine.begin` without actually executing any operations.

Database Models.

Now our connection is set up now lets create our database models which actual perform the database queries.

On the same folder of database connection i.e. “config/database.py” create a new file name “models.py” all the models of all our application will be declared in here.

#Fastapi Imports
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from pydantic import BaseModel

Base = declarative_base()

class User(Base):
__tablename__ = 'users'

id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
email = Column(String, unique=True, index=True)
full_name = Column(String)

class UserCreate(BaseModel):
username: str
email: str
full_name: str

class UserInDB(UserCreate):
id: int

class UserUpdate(BaseModel):
email: str
full_name: str

# Optional: Define response models for API responses
class UserResponse(BaseModel):
username: str
email: str
full_name: str

In this code block I added pydantic fields also as you can see. I usually create the pydantic models to services folder but for this scenario I used to create here only. Feel free to modify according to your need.

Now lets create CRUD for the same.

Async CRUD

This example defines endpoints for creating, reading, updating, and deleting users. It also uses async session for database operations, making the API efficient and suitable for asynchronous operations.

#FastAPI Imports
from fastapi import FastAPI, HTTPException, Depends, status
from sqlalchemy.future import select

# Local Imports
from config.models import User, UserCreate, UserUpdate, UserInDB, UserResponse
from config.database import db as session

# Create
@app.post("/users/", response_model=UserResponse)
async def create_user(user_create: UserCreate, session: AsyncSession = Depends(get_session)):
user = User(**user_create.dict())
session.add(user)
await session.commit()
return user

# Read
@app.get("/users/{user_id}", response_model=UserResponse)
async def read_user(user_id: int, session: AsyncSession = Depends(get_session)):
result = await session.execute(select(User).filter(User.id == user_id))
user = result.scalars().first()
if user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
return user

# Update
@app.put("/users/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, user_update: UserUpdate, session: AsyncSession = Depends(get_session)):
result = await session.execute(select(User).filter(User.id == user_id))
user = result.scalars().first()
if user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
for field, value in user_update.dict(exclude_unset=True).items():
setattr(user, field, value)
await session.commit()
return user

# Delete
@app.delete("/users/{user_id}", response_model=dict)
async def delete_user(user_id: int, session: AsyncSession = Depends(get_session)):
result = await session.execute(select(User).filter(User.id == user_id))
user = result.scalars().first()
if user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
session.delete(user)
await session.commit()
return {"message": "User deleted successfully"}

Lets Break Down the code and Understands what is going on under-the-hood

1. Imports: The code starts by importing necessary modules and classes from FastAPI, SQLAlchemy, and other required libraries and Local Imports.

2. CRUD Operations:

  • Create User: `create_user` route is defined with a POST method. It takes a `UserCreate` object as input, validates it, creates a new `User` object, adds it to the session, commits the session to persist changes, and returns the created user.
  • Read User: `read_user` route is defined with a GET method. It takes a user ID as input, queries the database for the user with the given ID, handles the case where the user is not found, and returns the user data.
  • Update User: `update_user` route is defined with a PUT method. It takes a user ID and a `UserUpdate` object as input, queries the database for the user with the given ID, updates the user’s fields with the provided values, commits the session to persist changes, and returns the updated user data.
  • Delete User: `delete_user` route is defined with a DELETE method. It takes a user ID as input, queries the database for the user with the given ID, deletes the user from the session, commits the session to persist changes, and returns a success message.
  • Dependency Injection: Dependency injection is used in route functions to inject the async session (`AsyncSession`) created by the `get_session` dependency.
  • Error Handling: The code includes basic error handling using `HTTPException` to return appropriate HTTP status codes and error messages when users are not found or other errors occur.
  • Response Models: Response models are defined using Pydantic schemas (`UserResponse`) to specify the structure of API responses.

If everything works fine then it should work on uvicorn main:app --reload

Conclusion

In summary, the provided FastAPI application efficiently manages user data with CRUD operations. Leveraging asynchronous database interactions and Pydantic for data validation, it ensures reliability and performance. Through concise error handling and clear response models, the application offers a robust user experience.

--

--

MyFaduGame

Gamer turned software developer, navigating virtual worlds and code realms to craft immersive experiences. Passion meets programming prowess