FastAPI and Apache Kafka

Arturo Cuicas
7 min readMar 25, 2023

--

Photo by Stephan Seeber on Unsplash

Continuing with the FastAPI series, now we give the turn to our stream processor of preference Apache Kafka, I must clarify that Kafka is not the best or worst Pub/Sub system in the development world, but in some cases where we want real-time streamer of events, Kafka is a great ally, besides having endless configurations that allow us to manage the communication of events.

After my previous posts, I did not want this one to be so long, so for this one, I will be more direct and I will not go into many theoretical details of Kafka, what I want to do in this post is to have a fun and practical example of Kafka, for this I thought of making a game between microservices!

This game consists of giving an initial message with a list of the names of the three actors who played Spirman in No Way Home (Tom Holland, Andrew Garfield, and Tobey Maguire). The first one in each message will be removed from the list, with the detail that each time a Spiderman is selected, he must randomly reorder the list of names and send it back to the topic

In summary, we are going to need the following:

  • A server to send the initial message (We are also going to send it by console).
  • A FastAPI Server that we are going to clone for each Spidey.
  • A Kafka Broker and a Zookeeper (or an image with both together).
  • A graphical interface to view the topicals and messages (Kafdrop, Kafka UI, Confluent)
  • A docker-compose to orchestrate the architecture.

Let’s get to work!

Now we are going to initialize our project with Poetry and install the necessary dependencies:

poetry init
...
poetry add fastapi uvicorn pydantic
poetry add python-dotenv
poetry add aiokafka

Let’s start with the structure of the project, for this, we are going to do something simple it will be an image that we will clone for each Spidey.

├── docker-compose.yaml
├── Dockerfile
├── .env
├── main.py
├── poetry.lock
└── pyproject.toml

To have a better perspective let’s start with our docker-compose.yaml so we have a general notion of what we are going to create:

version: "3.9"
services:
control_service:
build:
context: ./
dockerfile: Dockerfile
hostname: control_service
container_name: control_service
depends_on:
kafka:
condition: service_healthy
ports:
- "8000:8000"
environment:
- MY_NAME="Spiderweb Control"
env_file:
- .env
volumes:
- ./:/home/app
networks:
- my-net

spidey_1:
build:
context: ./
dockerfile: Dockerfile
hostname: spidey_1
container_name: spidey_1
depends_on:
kafka:
condition: service_healthy
ports:
- "8001:8000"
environment:
- MY_NAME=Tobey Maguire
env_file:
- .env
volumes:
- ./:/home/app
networks:
- my-net

spidey_2:
build:
context: ./
dockerfile: Dockerfile
hostname: spidey_2
container_name: spidey_2
depends_on:
kafka:
condition: service_healthy
ports:
- "8002:8000"
environment:
- MY_NAME=Andrew Garfield
env_file:
- .env
volumes:
- ./:/home/app
networks:
- my-net

spidey_3:
build:
context: ./
dockerfile: Dockerfile
hostname: spidey_3
container_name: spidey_3
depends_on:
kafka:
condition: service_healthy
ports:
- "8003:8000"
environment:
- MY_NAME=Tom Holland
env_file:
- .env
volumes:
- ./:/home/app
networks:
- my-net

kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- "8080:8080"
restart: always
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:29092
networks:
- my-net

kafka:
image: obsidiandynamics/kafka
container_name: kafka
restart: "no"
ports:
- "2181:2181"
- "9093:9093"
environment:
KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9093"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka:29092,EXTERNAL://localhost:9093"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
KAFKA_RESTART_ATTEMPTS: "10"
KAFKA_RESTART_DELAY: "5"
ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"
healthcheck:
test: nc -z localhost 29092 || exit
interval: 10s
timeout: 5s
retries: 15
networks:
- my-net

networks:
my-net:
external: true

As you can see we will have a single Docker image for the three Spidey and the control interface, in addition, we will have the Kafka UI with which we will manage the messages visually and a Kafka image that includes Kafka and Zookeeper!

Another very important detail is that each service has an environment variable MY_NAME which we will use so that each service can be identified.

Now let’s go with the image that will have a reasonably simple Docker:

FROM python:3.10  

ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
ENV PYTHONPATH /home/app

WORKDIR /home/app
COPY ./pyproject.toml ./poetry.lock* ./

RUN pip install poetry
RUN poetry install

CMD ["poetry", "run", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]

We will need environment variables .env that will be the same between services:

SPIDEY_NAMES="Tom Holland,Andrew Garfield,Tobey Maguire"  
KAFKA_BOOTSTRAP_SERVERS="kafka:29092"
SPIDERWEB_TOPIC="spiderweb"

Where:

  • SPIDEY_NAMES = is our Spidies list.
  • KAFKA_BOOTSTRAP_SERVERS = this is the URL of our Kafka broker.
  • SPIDERWEB_TOPIC = this will be the topic we will use for the game.

As you can see we will need to create this topic in Kafka for this we will do it in two ways, through the UI, and the console:

Through UI:

docker compose up --build

Now you can see the Kafka UI in the browser at the URL http://127.0.0.1:8080/

In console:

docker exec -it kafka bash
bash-4.4# ./opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:29092 --list
spiderweb <-- Topic created through the UI
bash-4.4# ./opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:29092 --create --topic other-spiderweb --partitions 1 --replication-factor 1
bash-4.4# ./opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:29092 --list
other-spiderweb <-- New Topic
spiderweb <-- Topic created through the UI
bash-4.4#

Nice! We have already created our topics to be used.

Let’s go with our main.py file, which will have all the logic. Let’s start with our control endpoint. and we’re going to need a Kafka Consumer and a Producer. We’re also going to need a couple of functions to handle the messages:

import asyncio
import json
import os
from typing import List
from random import shuffle

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from dotenv import load_dotenv
from fastapi import FastAPI

app = FastAPI()
load_dotenv()
loop = asyncio.get_event_loop()

spidey_names: List[str] = os.environ.get("SPIDEY_NAMES").split(",")
kafka_bootstrap_servers: str = os.environ.get("KAFKA_BOOTSTRAP_SERVERS")
spiderweb_topic: str = os.environ.get("SPIDERWEB_TOPIC")
my_name: str = os.environ.get("MY_NAME")

mapping_place = {
3: "name is the Winner!!!",
2: "name is the Second Place!!!",
1: "name is the third Place!!!",
}


def spidey_random(spidey_list: List) -> List:
shuffle(spidey_list)
return spidey_list


async def play_turn(finalists: List):
spidey_order = spidey_random(finalists)
await send_one(topic=spiderweb_topic, msg=spidey_order)


def kafka_serializer(value):
return json.dumps(value).encode()


def encode_json(msg):
to_load = msg.value.decode("utf-8")
return json.loads(to_load)


def check_spidey(finalists: List) -> bool:
return my_name == finalists[0]


def check_spidey(finalists: List) -> bool:
return my_name == finalists[0]async def send_one(topic: str, msg: List):
try:
producer = AIOKafkaProducer(
bootstrap_servers=kafka_bootstrap_servers
)
await producer.start()

try:
await producer.send_and_wait(topic, kafka_serializer(msg))
finally:
await producer.stop()

except Exception as err:
print(f"Some Kafka error: {err}")


async def spiderweb_turn(msg):
finalists = encode_json(msg)
is_my_turn = check_spidey(finalists)

if is_my_turn:
print(mapping_place[len(finalists)].replace('name', my_name))

if len(finalists) > 1:
finalists.pop(0)
await play_turn(finalists)


kafka_actions = {
"spiderweb": spiderweb_turn,
}


async def consume():
consumer = AIOKafkaConsumer(
spiderweb_topic,
loop=loop,
bootstrap_servers=kafka_bootstrap_servers,
)

try:
await consumer.start()

except Exception as e:
print(e)
return

try:
async for msg in consumer:
await kafka_actions[msg.topic](msg)

finally:
await consumer.stop()


asyncio.create_task(consume())


@app.get("/")
async def root():
return {"Kafka": "Spiderweb"}


@app.get("/start")
async def start_game():
spidey_order = spidey_random(spidey_names)
await send_one(topic=spiderweb_topic, msg=spidey_order)

return {"order": spidey_order}

As you can see in this file we have, first of all, the FastAPI instance, Asyncio, and Dotenv, then we set all our environment variables.

I summarize each function:

  • spidey_random = This function will be in charge of randomly organizing the list of Spidey.
  • play_turn = Send the new reorganized list through Kafka
  • kafka_serializer = Codify the message.
  • encode_json = Encoding the message received.
  • check_spidey = Verify if the list's first name is the same as the Imgae name.
  • send_one = Send a message to the selected Kafka topic.
  • spiderweb_turn = The logic that evaluates Spidey’s turn and the position in which he was placed.
  • consume = Waits for messages from the Kafka topic that we have selected.

In the end, we have two Endpoints, one for checking the application functions and another for starting the game, let's go we will try the game!

Now we check the docker-compose logs:

As you can see, in the first Random Tobey wins and second place would go to Tom, however, in the second Random the positions are changed and Andrew is left in second place and Tom loses in third place :’(

Conclusion

Using Apache Kafka, to propagate events between microservices is really simple and provides many advantages, it is also easy to debug and track, and it is true that a level of knowledge is required but the learning curve is quite smooth, it can also be reduced if you use a broker. third parties such as Amazon MSK or Confluent.

I hope you have enjoyed this example of using Apache Kafka, for me as a fan of Spiderman it was a lot of fun.

Here are some links of interest:

Have a great day!

Source Code

Reference

--

--

Arturo Cuicas

Hi friends! I'm passionate about learning and teaching technologies. I currently work as a Tech Lead and enjoy working in a team to achieve our goals.