FastAPI with PostgreSQL Part 1

Arturo Cuicas
11 min readJan 31, 2023

--

Photo by Balaji Malliswamy on Unsplash

We continue with the FastAPI Series and this time we are going to connect our application to a PostgreSQL Database, we are also going to do a couple of tests with Pytest, all this on Docker to have the necessary infrastructure, so, in short, we will see:

  • API Dockerization.
  • Infrastructure with Docker Compose. (DB — API)
  • SQLModel for tables.
  • Connection with PostgreSQL.
  • Testing with Pytest.

The first thing we are going to do is clone our previous Project and we are going to Dockerize it, for this, we are going to do it in the simplest way possible, in future chapters we will do it multi-staging so that it gets closer to a Production image.

First of all, we are going to create our Dockerfile file where we are going to indicate the Python image that we are going to use and we will install the dependencies through Poetry.

FROM python:3.10  

ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1

WORKDIR /home/app
COPY ./pyproject.toml ./poetry.lock* ./

RUN pip install poetry
RUN poetry install

CMD ["poetry", "run", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]

We will start with something straightforward and easy to understand:

  1. First of all, we indicate which version of Python we want to use, in this case, it will be 3.10.
  2. Then we add a couple of environment variables for Python.
  3. We indicate what our work Directory will be.
  4. We copy our local Poetry files and paste them into the container.
  5. We install Poetry.
  6. We install the dependencies with Poetry.
  7. We execute the command to run our service in reload mode and on port 8000

Now we go with our Docker Compose file:

version: "3.9"  

services:
fastapi_service:
build:
context: ./
dockerfile: Dockerfile
hostname: fastapi_service
container_name: fastapi_service
depends_on:
- db_postgres
ports:
- "8000:8000"
env_file:
- .env
volumes:
- ./:/home/app
networks:
- my-net

db_postgres:
image: postgres:14.3-alpine
hostname: db_postgres
container_name: db_postgres
restart: on-failure
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
ports:
- "5432:5432"
volumes:
- db-postgres:/var/lib/postgresql/data
networks:
- my-net

volumes:
db-postgres:
driver: local

networks:
my-net:
external: true

Here we are indicating how to start our database, I have already explained this in another Article, so I am not going to go into details, for our service I am going to indicate where our Docker file is, what name it will have, and where it is going to get the environment variables, we must also create our Network so that Docker Compose can work.

docker network create my-net

Now with this we can execute docker compose up— build and start the project:

docker compose up --build

Continuing with the previous project, we are going to create a Transactions table, it should simulate each entry or exit of money from a Wallet, this table will have the following columns id, created_at, description, and amount:

For this we are going to create a folder called db where we are going to place everything related to our connection to the database, then we will create a sessions file where we are going to leave our connection, we are also going to create a folder called tables, where we are going to declare all our instances, we will also have a repositories folder where we will have our methods for the instances.

.
├── __init__.py
├── repositories
│ ├── __init__.py
│ └── transactions.py
├── sessions.py
└── tables
├── __init__.py
└── transactions.py

To establish the session we are going to place the variables in our config file and to create the tables and connect to our database we are going to use SQLModel and SQLAlchemy.

Let’s start by installing SQLModel, SQLAlchemy, Psycopg2, and Asyncpg.

poetry add sqlmodel
poetry add sqlalchemy
poetry add psycopg2-binary
poetry add asyncpg

Now we are going to create our first table, and before we start placing our columns, we are going to create a base file for our tables, in this, we are going to place our classes and auxiliary columns.

from uuid import uuid4, UUID  
from datetime import datetime

from sqlalchemy import text
from sqlmodel import Field, SQLModel


class UUIDModel(SQLModel):
id: UUID = Field(
default_factory=uuid4,
primary_key=True,
index=True,
nullable=False,
)


class TimestampModel(SQLModel):
created_at: datetime = Field(
default_factory=datetime.utcnow,
nullable=False,
sa_column_kwargs={"server_default": text("current_timestamp(0)")},
)

Here we can see two classes with UUIDModel we will be able to dynamically add the ids in each record of our table and with TimestampModel we will be able to automatically place the creation date of our records, now we go with our transactions table and we will see how to Compose it with these helper classes:

from sqlmodel import SQLModel  

from db.tables.base_class import UUIDModel, TimestampModel


class TransactionBase(SQLModel):
amount: int
description: str


class Transaction(TransactionBase, UUIDModel, TimestampModel, table=True):
...

Now we are going to need to set our environment variables to connect to our database:

TITLE="Local FastAPI Postgres and Pytest"  
DESCRIPTION="FastAPI Postgres and Pytest"
OPENAPI_PREFIX=""
DEBUG=True

POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres
POSTGRES_SERVER=db_postgres
POSTGRES_PORT=5432
POSTGRES_DB=postgres

And in our config.py:

postgres_user: str = os.environ.get("POSTGRES_USER")  
postgres_password: str = os.environ.get("POSTGRES_PASSWORD")
postgres_server: str = os.environ.get("POSTGRES_SERVER")
postgres_port: int = int(os.environ.get("POSTGRES_PORT"))
postgres_db: str = os.environ.get("POSTGRES_DB")
db_echo_log: bool = True if os.environ.get("DEBUG") == "True" else False

@property
def sync_database_url(self) -> str:
return f"postgresql://{self.postgres_user}:{self.postgres_password}@{self.postgres_server}:{self.postgres_port}/{self.postgres_db}"

Now that we have our data for the connection, we are going to create our engine in our sessions.py file to connect and create our table.

from sqlmodel import SQLModel, create_engine, Session  

from core.config import settings
from db.tables.transactions import Transaction


engine = create_engine(
url=settings.sync_database_url,
echo=settings.db_echo_log,
)


def create_transaction():
transaction = Transaction(amount=10, description="First transaction")

with Session(engine) as session:
session.add(transaction)
session.commit()


def create_tables():
SQLModel.metadata.drop_all(engine)
SQLModel.metadata.create_all(engine)

As you can see we are creating an engine to connect to our database and later we are deleting our tables (drop_all) and later we are creating it again (creat_all). Later we are calling a method to create a dummy transaction create_transaction.

Now we are going to create an endpoint in our main.py file so we can delete the tables and create them:

from fastapi import FastAPI, status  

from core.config import settings
from api.router import router
from db.sessions import create_tables


app = FastAPI(
title=settings.title,
version=settings.version,
description=settings.description,
openapi_prefix=settings.openapi_prefix,
docs_url=settings.docs_url,
openapi_url=settings.openapi_url
)

app.include_router(router, prefix=settings.api_prefix)


@app.get("/")
async def root():
return {"Say": "Hello!"}


@app.get(
"/init_tables",
status_code=status.HTTP_200_OK,
name="init_tables"
)
async def init_tables():
create_tables()

Now we are ready to test and start creating records in our table.

We are going to execute our command to create tables and we can do it through the documentation or the command line:

❯ http http://localhost:8000/init_tables
HTTP/1.1 200 OK
content-length: 4
content-type: application/json
date: Sun, 29 Jan 2023 20:55:54 GMT
server: uvicorn

null

And if we check our PostgreSQL we should be able to see our transactions table:

docker exec -it db_postgres psql -U postgres
psql (14.3)
Type "help" for help.

postgres=# \c postgres
You are now connected to database "postgres" as user "postgres".
postgres=# \dt
List of relations
Schema | Name | Type | Owner
--------+-------------+-------+----------
public | transaction | table | postgres
(1 row)

Now we are going to create the endpoints to create, list, get and delete a transaction, in the route folder, we are going to delete our old Bands file from the previous tutorial and create a new one called transactions.py where we are going to place our new endpoints.

Let’s start with the creation for this we are going to need a couple of things before we continue:

  1. We are going to need an asynchronous connection to the database.
  2. We are going to need one or two Schemas (Read and Create).
  3. We are also going to need a repository where we have the methods to interact with our DB.

Let’s start with our Schemas which we are going to use the Standard Read/Create and we are going to delete our old schema bands.py from the schemas folder and we are going to create a new one for transactions.py.

from db.tables.transactions import TransactionBase  


class TransactionCreate(TransactionBase):
...


class TransactionRead(TransactionBase):
...

Now we are going to create the repository to interact with our DB:

from sqlmodel.ext.asyncio.session import AsyncSession  

from db.tables.transactions import Transaction
from schemas.transactions import TransactionCreate, TransactionRead


class TransactionRepository:
def __init__(self, session: AsyncSession) -> None:
self.session = session

def create(self, transaction_create: TransactionCreate) -> TransactionRead:
db_transaction = Transaction.from_orm(transaction_create)
self.session.add(db_transaction)
await self.session.commit()
await self.session.refresh(db_transaction)

return TransactionRead(
amount=db_transaction.amount,
description=db_transaction.description
)

As you can see, the session that we will pass to the class will be of the asynchronous type. We must create an asynchronous engine with its respective configuration:

First, we are going to create a new property in our config.py that returns our asynchronous connection string:

@property  
def async_database_url(self) -> str:
return f"postgresql+asyncpg://{self.postgres_user}:{self.postgres_password}@{self.postgres_server}:{self.postgres_port}/{self.postgres_db}"

and in our sessions.py file, we are going to configure our asynchronous engine, leaving our file as follows:

from sqlalchemy.ext.asyncio import create_async_engine  
from sqlalchemy.orm import sessionmaker
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlmodel import SQLModel, create_engine, Session

from core.config import settings
from db.tables.transactions import Transaction


engine = create_engine(
url=settings.sync_database_url,
echo=settings.db_echo_log,
)

async_engine = create_async_engine(
url=settings.async_database_url,
echo=settings.db_echo_log,
future=True,
)

async_session = sessionmaker(
bind=async_engine, class_=AsyncSession, expire_on_commit=False
)


def create_transaction():
transaction = Transaction(amount=10, description="First transaction")

with Session(engine) as session:
session.add(transaction)
session.commit()


def create_tables():
SQLModel.metadata.drop_all(engine)
SQLModel.metadata.create_all(engine)
create_transaction()

Now we are going to create a directory of dependencies to be injected into our endpoints, and we are going to start with repositories.py which is going to help us to inject our repository through Depends, we are going to create a method called get_repository which in turn will have the session dependency injection.

from fastapi import Depends  
from sqlalchemy.ext.asyncio import AsyncSession

from db.sessions import async_session


async def get_db() -> AsyncSession:
async with async_session() as session:
yield session
await session.commit()


def get_repository(repository):
def _get_repository(session: AsyncSession = Depends(get_db)):
return repository(session)

return _get_repository

As you can see what we have is a function that receives a repository type and returns it with the asynchronous session injected and ready to be consumed by the endpoint:

from fastapi import APIRouter, status, Body, Depends  

from api.dependencies.repositories import get_repository
from db.repositories.transactions import TransactionRepository
from schemas.transactions import TransactionRead, TransactionCreate

router = APIRouter()


@router.post(
"/transactions",
response_model=TransactionRead,
status_code=status.HTTP_201_CREATED,
name="create_transaction"
)
async def create_transaction(
transaction_create: TransactionCreate = Body(...),
repository: TransactionRepository = Depends(get_repository(TransactionRepository)),
) -> TransactionRead:

return await repository.create(transaction_create=transaction_create)

As you can see, unlike our band endpoint from the previous tutorial, here we are passing the repository that the function will use through the FastAPI Depends method.

❯ http --json POST http://localhost:8000/api/transactions/transactions amount:=10 description="First Pay"
HTTP/1.1 201 Created
content-length: 35
content-type: application/json
date: Sun, 29 Jan 2023 23:04:36 GMT
server: uvicorn

{
"amount": 10,
"description": "First Pay"
}

If we see the console thanks to echo, we can see how SQLAlchemy does the insert and then does the select so we can return the record.

INFO sqlalchemy.engine.Engine BEGIN (implicit)

INFO sqlalchemy.engine.Engine INSERT INTO transaction (created_at, id, amount, description) VALUES (%s, %s, %s, %s)

INFO sqlalchemy.engine.Engine [cached since 190.1s ago] (datetime.datetime(2023, 1, 29, 23, 5, 40, 525503), UUID('ea075df2-e451-496c-b133-093a302ec9ec'), 10, 'First Pay')

INFO sqlalchemy.engine.Engine COMMIT

INFO sqlalchemy.engine.Engine BEGIN (implicit)
INFO sqlalchemy.engine.Engine SELECT transaction.created_at, transaction.id, transaction.amount, transaction.description
FROM transaction
WHERE transaction.id = %s
INFO sqlalchemy.engine.Engine [cached since 190.1s ago] (UUID('ea075df2-e451-496c-b133-093a302ec9ec'),)

INFO sqlalchemy.engine.Engine COMMIT

Now we will return a list of transactions with the get transactions endpoint in our repository and add our list method:

...
async def list(self) -> list[TransactionRead]:
results = await self.session.execute(
select(Transaction)
)

return [
TransactionRead(
amount=transaction.amount,
description=transaction.description
)
for transaction in results.scalars()
]

and in our route we will add the get_transactions endpoint with the GET method:

@router.get(  
"/transactions",
response_model=list[Optional[TransactionRead]],
status_code=status.HTTP_200_OK,
name="get_transactions"
)
async def get_transactions(
repository: TransactionRepository = Depends(get_repository(TransactionRepository)),
) -> list[Optional[TransactionRead]]:
return await repository.list()

if we try it:

❯ http http://localhost:8000/api/transactions/transactions
HTTP/1.1 200 OK
content-length: 98
content-type: application/json
date: Sun, 29 Jan 2023 23:39:04 GMT
server: uvicorn

[
{
"amount": 10,
"description": "First transaction"
},
{
"amount": 20,
"description": "Second Transaction"
}
]

Now we are going to create the methods to return and eliminate a transaction, each one with its Endpoint, but first, we are going to create an Exception in case we do not find the record, for them we are going to create a generic exception for the cases in which our queries return None.

In our db folder, we are going to create an errors.py file where we are going to create our generic exception:

class EntityDoesNotExist(Exception):  
"""Raised when entity was not found in database."""
async def get(self, transaction_id: UUID) -> Optional[TransactionRead]:  
transaction = await self.session.get(Transaction, transaction_id)

if transaction is None:
raise EntityDoesNotExist

return TransactionRead(
amount=transaction.amount,
description=transaction.description
)

async def delete(self, transaction_id: UUID) -> None:
transaction = await self.session.get(Transaction, transaction_id)

if transaction is None:
raise EntityDoesNotExist

return await self.session.delete(transaction)

Later we are going to improve these methods, here the important thing is that we can obtain a transaction by id and eliminate it, so let’s search the DB for an id:

❯ docker exec -it db_postgres psql -U postgres
psql (14.3)
Type "help" for help.

postgres=# select * from transaction;
created_at | id | amount | description
----------------------------+--------------------------------------+--------+--------------------
2023-01-29 23:37:53.376321 | 9f56b2d4-9f61-4989-8a49-147f55139004 | 10 | First transaction
2023-01-29 23:38:39.676013 | e3f034dd-f86c-4526-b19b-d45dc7778a07 | 20 | Second Transaction
(2 rows)

Now prepare our endpoints:

@router.get(  
"/transactions/{transaction_id}",
response_model=TransactionRead,
status_code=status.HTTP_200_OK,
name="get_transaction"
)
async def get_transaction(
transaction_id: UUID,
repository: TransactionRepository = Depends(get_repository(TransactionRepository)),
) -> TransactionRead:
try:
await repository.get(transaction_id=transaction_id)
except EntityDoesNotExist:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Transaction not found!"

)

return await repository.get(transaction_id=transaction_id)


@router.delete(
"/transactions/{transaction_id}",
status_code=status.HTTP_204_NO_CONTENT,
name="delete_transaction"
)
async def delete_transaction(
transaction_id: UUID,
repository: TransactionRepository = Depends(get_repository(TransactionRepository)),
) -> None:
try:
await repository.get(transaction_id=transaction_id)
except EntityDoesNotExist:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Transaction not found!"

)

return await repository.delete(transaction_id=transaction_id)

With this we will have an HTTP exception 404 when the record is not found, for now, we are going to leave it here since it is getting quite extensive, finally, we are going to install Black, Isort, and Flake8 to organize and clean up the code a bit before uploading it to the repository.

poetry add black flake8 isort --dev
poetry run black .
poetry run isort . --profile black
poetry run flake8 .

Next Part

In the next part we will see:

  1. Pagination for list transactions
  2. Methods Patch and Delete(Soft),
  3. Tests for our code with Pytest.
  4. Makefile, To make life easier!

Part 2 is now available!!!

Source Code

https://github.com/arturocuicas/fastapi_postgres_pytests

Reference

--

--

Arturo Cuicas

Hi friends! I'm passionate about learning and teaching technologies. I currently work as a Tech Lead and enjoy working in a team to achieve our goals.