FastAPI with PostgreSQL Part 2

Arturo Cuicas
13 min readFeb 21, 2023

--

Photo by Matteo Steger on Unsplash

Well, I hope you have been of interest in Part 1 now we will continue with Part 2 and the final part of this FastAPI project, the first thing we will do is create a Makefile to manage our commands and to make our life easier, then we will be creating a couple of tests for the methods we did in Part 1 (GET, CREATE, LIST, DELETE) and then we will create new Endpoints with their respective Testcases and finally, we will see the pagination and the soft delete.

In summary, this section will have:

  • Makefile for running commands!
  • Tests Environment
  • Tests for endpoints
  • Pagination
  • Soft Delete

We start with the Makefile, add the most basic commands to run the environment, and run the tests.

coffee:
@printf 'Enjoy your coffee! \xE2\x98\x95'

dev:
@docker compose -f docker-compose.yaml up --build

run:
@docker compose -f docker-compose.yaml up --build -d

down:
@docker compose -f ./docker-compose.yaml down --remove-orphans

shell: run
@docker exec -it fastapi_service bash

tests: run
@docker exec -it fastapi_service poetry run pytest

.PHONY: coffee dev run stop shell tests

If we try it and run make coffee:

❯ make coffee
Enjoy your coffee! ☕

As you can see we have commands that depend on others, the idea is that it allows us to run the environment in the background and enter the API console to run the tests or simply to be inside the server console.

Now that we have our make file to execute our commands we are going to configure the testing environment, for this, we are going to install Pytest and other packages.

poetry add pytest pytest-asyncio httpx --dev

The first thing we are going to do is to create the directories and then we are going to configure our conftests.py.

└── tests
├── conftest.py
├── test_repositories
│ └── test_transactions.py
├── test_routes
│ └── test_transactions.py
└── test_schemas
└── test_transactions.py

You can do it with the following structure, but it is only a suggestion, the idea is that each person or each team finds the best way to test their application and prevent future errors in the code.

Now we are going to configure our tests in the conftest.py we will start with the connection to the test database.

import asyncio
from typing import AsyncGenerator, Callable, Generator

import pytest
import pytest_asyncio
from fastapi import FastAPI
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlmodel import SQLModel
from sqlmodel.ext.asyncio.session import AsyncSession

from core.config import settings

test_db = (
f"postgresql+asyncpg://{settings.postgres_user}:{settings.postgres_password}"
f"@{settings.postgres_server}:{settings.postgres_port}/{settings.postgres_db_tests}"
)

engine = create_async_engine(
test_db,
echo=settings.db_echo_log,
future=True,
)

async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)


@pytest_asyncio.fixture(scope="session")
def event_loop(request) -> Generator:
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()


@pytest_asyncio.fixture()
async def db_session() -> AsyncSession:
async with engine.begin() as connection:
await connection.run_sync(SQLModel.metadata.drop_all)
await connection.run_sync(SQLModel.metadata.create_all)
async with async_session(bind=connection) as session:
yield session
await session.flush()
await session.rollback()


@pytest.fixture()
def override_get_db(db_session: AsyncSession) -> Callable:
async def _override_get_db():
yield db_session

return _override_get_db


@pytest.fixture()
def app(override_get_db: Callable) -> FastAPI:
from api.dependencies.repositories import get_db
from main import app

app.dependency_overrides[get_db] = override_get_db

return app


@pytest_asyncio.fixture()
async def async_client(app: FastAPI) -> AsyncGenerator:
async with AsyncClient(app=app, base_url="http://test") as ac:
yield ac

I know this code seems a bit complex, but we are simply creating an asynchronous connection to our test database (test_db), and then we are creating a client and a session. FastAPI has an option that allows us to rewrite global dependencies which we are applying in the get_db function.

To continue we must create a script that generates the databases again, in case you have problems creating them, you may need to remove the volume and run make dev again.

CREATE DATABASE transactions_db;  
CREATE DATABASE test_transactions_db;

GRANT ALL PRIVILEGES ON DATABASE transactions_db to "postgres";
GRANT ALL PRIVILEGES ON DATABASE test_transactions_db to "postgres";

This script we are going to execute in our docker-compose.yaml when we raise our database, what it will do is create our databases the first time that we execute it.

  db_postgres:
image: postgres:14.3-alpine
hostname: db_postgres
container_name: db_postgres
restart: on-failure
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
ports:
- "5432:5432"
volumes:
- db-postgres:/var/lib/postgresql/data
- ./scripts/create_databases.sql:/docker-entrypoint-initdb.d/create_databases.sql
networks:
- my-net

As you can see we create a scripts folder to store our create_databases.sql file.

Ready! the first thing we are going to test is the repositories to the database and we are going to start with the create method for this we have made a modification in the Schema of TransactionRead so that it can return the id.

It is also necessary to include the IDs in the methods that use TransactionRead as a response.

from uuid import UUID  
from db.tables.transactions import TransactionBase


class TransactionCreate(TransactionBase):
...


class TransactionRead(TransactionBase):
id: UUID

Before starting with the tests we are going to create a Fixture to create Transactions in a simple way here I will not go into detail on how the fixtures work, but I leave the link to the documentation, let’s create them in our conftest.py file so we can use them in our tests:

@pytest.fixture()  
def create_transaction():
def _create_transaction(
amount: int = 10,
description: str = "Text description",
):
return TransactionCreate(
amount=amount,
description=description
)

return _create_transaction


@pytest.fixture()
def create_transactions(create_transaction):
def _create_transactions(
_qty: int = 1
):
return [
create_transaction(
amount=i,
description=f"Transaction number {i}"
)
for i in range(_qty)]

return _create_transactions

You will notice that there are two functions, one to create a single transaction and one to create N transactions. Now we can create our first test repository to create a transaction:

from uuid import UUID
import pytest
from sqlmodel.ext.asyncio.session import AsyncSession


from db.repositories.transactions import TransactionRepository


@pytest.mark.asyncio
async def test_create_transaction(
db_session: AsyncSession,
create_transaction
):
transaction = create_transaction()
repository = TransactionRepository(db_session)

db_transaction = await repository.create(transaction)

assert db_transaction.amount == transaction.amount
assert db_transaction.description == transaction.description
assert isinstance(db_transaction.id, UUID)

Let’s run the command make tests! let’s see our first test run!!!

❯ make tests
...

============================= test session starts =========================
platform linux -- Python 3.10.10, pytest-7.2.1, pluggy-1.0.0
rootdir: /home/app
plugins: anyio-3.6.2, asyncio-0.20.3
asyncio: mode=strict
collected 1 item

============================= 1 passed in 0.09s ===========================

Nice! Now we add the test to obtain all the Transactions:

@pytest.mark.asyncio  
async def test_get_transactions(
db_session: AsyncSession,
create_transaction
):
transaction = create_transaction()
repository = TransactionRepository(db_session)
await repository.create(transaction)

db_transactions = await repository.list()

assert isinstance(db_transactions, list)
assert db_transactions[0].amount == transaction.amount
assert db_transactions[0].description == transaction.description

Now we are going to test the method get transaction by id, with which we should be able to get a transaction by its ID, and in case it does not exist we will get an Entity doesn’t exist exception. for this, we will need to create a random uuid with uuid4() and we will also need our EntityDoesNotExist exception:

@pytest.mark.asyncio  
async def test_get_transaction_by_id(
db_session: AsyncSession,
create_transaction
):
transaction = create_transaction()
repository = TransactionRepository(db_session)

transaction_created = await repository.create(transaction)
transaction_db = await repository.get(transaction_id=transaction_created.id)

assert transaction_created == transaction_db

@pytest.mark.asyncio
async def test_get_transaction_by_id_not_found(
db_session: AsyncSession
):
repository = TransactionRepository(db_session)

with pytest.raises(expected_exception=EntityDoesNotExist):
await repository.get(transaction_id=uuid4())

Perfect! we already have our main repository tests, now we are going to advance with the Update and the soft delete. for this, we are going to do some TDD and we are going to start with the tests and then the methods.

For the update we are going to create a new Schema TransactionPatch, that allows us to control the parameters that can be updated through our method, it is also advisable to control them in the method.

pytest.mark.asyncio  
async def test_update_transaction(
db_session: AsyncSession,
create_transaction
):
init_amount = 10
init_description = "Initial Description"
final_amount = 20
final_description = "Final Description"
transaction = create_transaction(amount=init_amount, description=init_description)
repository = TransactionRepository(db_session)

db_transaction = await repository.create(transaction)

update_transaction = await repository.patch(
transaction_id=db_transaction.id,
transaction_patch=TransactionPatch(
amount=final_amount,
description=final_description
)
)

assert update_transaction.id == db_transaction.id
assert update_transaction.amount == final_amount
assert update_transaction.description == final_description

If we execute it we will have the following result:

E       AttributeError: 'TransactionRepository' object has no attribute 'patch'

tests/test_repositories/test_transactions.py:79: AttributeError
================================================== short test summary info ===========================================================================
FAILED tests/test_repositories/test_transactions.py::test_update_transaction - AttributeError: 'TransactionRepository' object has no attribute 'patch'
================================================== 1 failed, 4 passed in 0.17s =======================================================================
make: *** [Makefile:15: tests] Error 1

Now we are going to create the Patch method so that you can pass the tests:

async def patch(self, transaction_id: UUID, transaction_patch: TransactionPatch) -> Optional[TransactionRead]:  
db_transaction = await self.session.get(Transaction, transaction_id)

if db_transaction is None:
raise EntityDoesNotExist

transaction_data = transaction_patch.dict(exclude_unset=True, exclude={"id"})
for key, value in transaction_data.items():
setattr(db_transaction, key, value)

self.session.add(db_transaction)
await self.session.commit()
await self.session.refresh(db_transaction)

return TransactionRead(**db_transaction.dict())

As you can see we are making use of the dict() method of Pydantic to manipulate the instance and its attributes, leaving out those that do not come and the id (this is only an example for the dict() method), if we execute the tests we will have the following result:

platform linux -- Python 3.10.10, pytest-7.2.1, pluggy-1.0.0
rootdir: /home/app
plugins: anyio-3.6.2, asyncio-0.20.3
asyncio: mode=strict
collected 5 items
tests/test_repositories/test_transactions.py .....
============================= 5 passed in 0.16s ==========================

Now let’s go with the soft delete for this we will introduce the enums, the enums will help us to control the possible values that can have a field in our table, in this particular case we will manage the status of an instance, being the possible combinations: active, inactive and deleted, let’s start with the enum, in our base_class.py of tables we will create the following class:

class StatusEnum(str, Enum):
active = "active"
inactive = "inactive"
deleted = "deleted"

Now we are going to pass it to our Transaction model, we take the opportunity to update our Transaction model:

from sqlmodel import Field, SQLModel  

from db.tables.base_class import TimestampModel, UUIDModel, StatusEnum


class TransactionBase(SQLModel):
amount: int = Field(nullable=False)
description: str = Field(nullable=False)


class Transaction(TransactionBase, UUIDModel, TimestampModel, table=True):
status: StatusEnum = Field(default=StatusEnum.inactive)

__tablename__ = "transactions"

Now let’s update our delete method so that it can perform the soft delete:

async def delete(self, transaction_id: UUID) -> None:  
db_transaction = await self.session.get(Transaction, transaction_id)

if db_transaction is None:
raise EntityDoesNotExist

setattr(db_transaction, "status", StatusEnum.deleted)
self.session.add(db_transaction)

await self.session.commit()

As you can see on the repository side we are going to return None in case everything went correctly.

We are now ready to create our soft delete test:

@pytest.mark.asyncio  
async def test_soft_delete_transaction(
db_session: AsyncSession,
create_transaction
):
transaction = create_transaction()
repository = TransactionRepository(db_session)
db_transaction = await repository.create(transaction)

delete_transaction = await repository.delete(transaction_id=db_transaction.id)
transaction = await repository.get(transaction_id=db_transaction.id)

assert delete_transaction is None
assert transaction is None

And if we run the tests we see the following:

E       AssertionError: assert TransactionRead(amount=10, description='Text description', id=UUID('5fdb0f19-bc92-4b11-a2b7-e0ea535cfed1')) is None

tests/test_repositories/test_transactions.py:104: AssertionError
========================================================================================================== short test summary info =============================================================================================
FAILED tests/test_repositories/test_transactions.py::test_soft_delete_transaction - AssertionError: assert TransactionRead(amount=10, description='Text description', id=UUID('5fdb0f19-bc92-4b11-a2b7-e0ea535cfed1')) is None
======================================================================================================== 1 failed, 5 passed in 0.20s ===========================================================================================
make: *** [Makefile:15: tests] Error 1

This is because we are not filtering our transactions in the get, list, patch, or delete methods, what we are going to do is to create a private method to get the transactions:

class TransactionRepository:  
def __init__(self, session: AsyncSession) -> None:
self.session = session

async def _get_instance(self, transaction_id: UUID):
statement = (
select(Transaction)
.where(Transaction.id == transaction_id)
.where(Transaction.status != StatusEnum.deleted)
)
results = await self.session.exec(statement)

return results.first()

async def create(self, transaction_create: TransactionCreate) -> TransactionRead:
db_transaction = Transaction.from_orm(transaction_create)
self.session.add(db_transaction)
await self.session.commit()
await self.session.refresh(db_transaction)

return TransactionRead(**db_transaction.dict())

async def list(self) -> list[TransactionRead]:
statement = (
select(Transaction)
.where(Transaction.status != StatusEnum.deleted)
)
results = await self.session.exec(statement)

return [
TransactionRead(**transaction.dict())
for transaction in results
]

async def get(self, transaction_id: UUID) -> Optional[TransactionRead]:
db_transaction = await self._get_instance(transaction_id)

if db_transaction is None:
raise EntityDoesNotExist

return TransactionRead(**db_transaction.dict())

async def patch(self, transaction_id: UUID, transaction_patch: TransactionPatch) -> Optional[TransactionRead]:
db_transaction = await self._get_instance(transaction_id)

if db_transaction is None:
raise EntityDoesNotExist

transaction_data = transaction_patch.dict(exclude_unset=True, exclude={"id"})
for key, value in transaction_data.items():
setattr(db_transaction, key, value)

self.session.add(db_transaction)
await self.session.commit()
await self.session.refresh(db_transaction)

return TransactionRead(**db_transaction.dict())

async def delete(self, transaction_id: UUID) -> None:
db_transaction = await self._get_instance(transaction_id)

if db_transaction is None:
raise EntityDoesNotExist

setattr(db_transaction, "status", StatusEnum.deleted)
self.session.add(db_transaction)

await self.session.commit()

We update our delete test and run it again:

@pytest.mark.asyncio  
async def test_soft_delete_transaction(
db_session: AsyncSession,
create_transaction
):
transaction = create_transaction()
repository = TransactionRepository(db_session)
db_transaction = await repository.create(transaction)

delete_transaction = await repository.delete(transaction_id=db_transaction.id)

assert delete_transaction is None
with pytest.raises(expected_exception=EntityDoesNotExist):
await repository.get(transaction_id=db_transaction.id)
make tests
...
========================== test session starts ===========================
platform linux -- Python 3.10.10, pytest-7.2.1, pluggy-1.0.0
rootdir: /home/app
plugins: anyio-3.6.2, asyncio-0.20.3
asyncio: mode=strict
collected 6 items

tests/test_repositories/test_transactions.py ......

============================= 6 passed in 0.18s ==========================

Ok now that we have tested all the methods of the repository we are going to test the Schemas and the Routes, for it we are going to make it very simple, in the case of the Schemas, we are only going to test that the required fields are received and that the values are the indicated type:

import pytest  

from pydantic import ValidationError

from schemas.transactions import TransactionCreate


def test_transaction_instance_empty():
with pytest.raises(expected_exception=ValidationError):
TransactionCreate()


def test_transaction_instance_amount_empty():
with pytest.raises(expected_exception=ValidationError):
TransactionCreate(
description="Description"
)


def test_transaction_instance_description_empty():
with pytest.raises(expected_exception=ValidationError):
TransactionCreate(
amount=10
)


def test_transaction_instance_amount_wrong():
with pytest.raises(expected_exception=ValidationError):
TransactionCreate(
amount="amount",
description="Description"
)

Here the possibilities are many but it all depends on your application and the attributes you are handling. It is also very important to be able to control what value can pass between our service layers.

Now to finish with the tests we are going to test our Routes. To be able to test the routes it is necessary to import the asynchronous client async_client from our conftests.py to then use in the routes that we have already created Create, List, Get, and Delete.

import pytest  
from uuid import UUID

from fastapi import status


@pytest.mark.asyncio
async def test_get_transactions(
async_client
):
response = await async_client.get(
"/api/transactions/transactions"
)

assert response.status_code == status.HTTP_200_OK
assert len(response.json()) == 0


@pytest.mark.asyncio
async def test_create_transaction(
async_client,
create_transaction
):
transaction = create_transaction()
response = await async_client.post(
"/api/transactions/transactions", json=transaction.dict()
)

assert response.status_code == status.HTTP_201_CREATED
assert response.json()["amount"] == transaction.amount
assert response.json()["description"] == transaction.description
assert UUID(response.json()["id"])


@pytest.mark.asyncio
async def test_get_transaction(
async_client,
create_transaction
):
transaction = create_transaction()
response_create = await async_client.post(
"/api/transactions/transactions", json=transaction.dict()
)
response = await async_client.get(
f"/api/transactions/transactions/{response_create.json()['id']}"
)

assert response.status_code == status.HTTP_200_OK
assert response.json()["amount"] == transaction.amount
assert response.json()["description"] == transaction.description
assert response.json()["id"] == response_create.json()["id"]


@pytest.mark.asyncio
async def test_delete_transaction(
async_client,
create_transaction
):
transaction = create_transaction()
response_create = await async_client.post(
"/api/transactions/transactions", json=transaction.dict()
)
response = await async_client.delete(
f"/api/transactions/transactions/{response_create.json()['id']}"
)

assert response.status_code == status.HTTP_204_NO_CONTENT

Now we are going to finish with the tests for the Update method and for the pagination, for Update we have the following test:

@pytest.mark.asyncio  
async def test_update_transaction(
async_client,
create_transaction
):
transaction = create_transaction(amount=10, description="Init Description")
response_create = await async_client.post(
"/api/transactions/transactions", json=transaction.dict()
)

new_amount = 20
new_description = "New Description"
response = await async_client.put(
f"/api/transactions/transactions/{response_create.json()['id']}",
json={'amount': new_amount, "description": new_description}
)

assert response.status_code == status.HTTP_200_OK
assert response.json()["amount"] == new_amount
assert response.json()["description"] == new_description
assert response.json()["id"] == response_create.json()["id"]

Now we need to create this route, which we did not have:

@router.put(  
"/transactions/{transaction_id}",
response_model=TransactionRead,
status_code=status.HTTP_200_OK,
name="delete_transaction",
)
async def delete_transaction(
transaction_id: UUID,
transaction_patch: TransactionPatch = Body(...),
repository: TransactionRepository = Depends(get_repository(TransactionRepository)),
) -> TransactionRead:
try:
await repository.get(transaction_id=transaction_id)
except EntityDoesNotExist:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Transaction not found!"
)

return await repository.patch(
transaction_id=transaction_id,
transaction_patch=transaction_patch
)

Now almost closing we are going to include the pagination, for this test the scenario is simple we are going to create 4 records and we will make a pagination of 2 records per page, so page 1 will have two records and page 2 will also have 2 records.

We must begin with the list method of our repository, we are going to add two parameters that will allow us to filter our queries these are the limit that by default we are going to leave in 10 and the offset that by default we are going to leave it in zero:

async def list(self, limit: int = 10, offset: int = 0) -> list[TransactionRead]:  
statement = (
select(Transaction)
.where(Transaction.status != StatusEnum.deleted)
).offset(offset).limit(limit)
results = await self.session.exec(statement)

return [
TransactionRead(**transaction.dict())
for transaction in results
]

Now let’s modify our route to receive these query params:

@router.get(  
"/transactions",
response_model=list[Optional[TransactionRead]],
status_code=status.HTTP_200_OK,
name="get_transactions",
)
async def get_transactions(
limit: int = Query(default=10, lte=100),
offset: int = Query(default=0),
repository: TransactionRepository = Depends(get_repository(TransactionRepository)),
) -> list[Optional[TransactionRead]]:
return await repository.list(limit=limit, offset=offset)

Here you will notice that you have to import the Query library from FastAPI, now we are going to do a very simple test:

@pytest.mark.asyncio  
async def test_get_transaction_paginated(
db_session,
async_client,
create_transactions
):
repository = TransactionRepository(db_session)
for transaction in create_transactions(_qty=4):
await repository.create(transaction)

response_page_1 = await async_client.get(
"/api/transactions/transactions?limit=2"
)
assert len(response_page_1.json()) == 2

response_page_2 = await async_client.get(
"/api/transactions/transactions?limit=2&offset=2"
)
assert len(response_page_2.json()) == 2

response = await async_client.get(
"/api/transactions/transactions"
)
assert len(response.json()) == 4

At the end of running our tests we will have the following result:

========================== test session starts ===========================
platform linux -- Python 3.10.10, pytest-7.2.1, pluggy-1.0.0
rootdir: /home/app
plugins: anyio-3.6.2, asyncio-0.20.3
asyncio: mode=strict
collected 16 items

tests/test_repositories/test_transactions.py ......
tests/test_routes/test_transactions.py ......
tests/test_schemas/test_transactions.py ....
============================ 16 passed in 0.37s ==========================

We have already finished now to upload our project and make it more presentable we are going to add a Lint command to make the code ready, and in our Makefile, we are going to add the following command.

lint: run  
@docker exec -it fastapi_service poetry run black .
@docker exec -it fastapi_service poetry run isort . --profile black

And we executed it!!!

❯ make lint

....

reformatted tests/test_schemas/test_transactions.py
reformatted api/routes/transactions.py
reformatted tests/test_repositories/test_transactions.py
reformatted db/repositories/transactions.py
reformatted tests/conftest.py
reformatted tests/test_routes/test_transactions.py

All done! ✨ 🍰 ✨
6 files reformatted, 23 files left unchanged.
Fixing /home/app/tests/conftest.py
Fixing /home/app/tests/test_schemas/test_transactions.py
Fixing /home/app/tests/test_repositories/test_transactions.py
Fixing /home/app/tests/test_routes/test_transactions.py
Fixing /home/app/db/repositories/transactions.py
Fixing /home/app/db/tables/transactions.py
Fixing /home/app/schemas/transactions.py
Fixing /home/app/api/routes/transactions.py
Skipped 1 files

Conclusion

I hope you liked this project, and I apologize because I know it was a bit long, but I wanted to have a base project that will serve as a reference for future scenarios (Kafka, Redis, gRPC) [Spoilers!], I also wanted to emphasize the importance of testing in the development of applications not only to validate their proper functioning but also for maintenance and improvements, more when we work in teams and want everything to be automated and standardized.

Leave me a comment if you have any questions or suggestions, they are always welcome, I leave the link to the repo of the project.

Have a great day!

Source Code

Reference

--

--

Arturo Cuicas

Hi friends! I'm passionate about learning and teaching technologies. I currently work as a Tech Lead and enjoy working in a team to achieve our goals.