Event Driven Shopping App with Python, Kafka & BigQuery

Joao Paulo Alvim
10 min readJan 31, 2023

--

Create a scalable end to end solution to capture data and generate real-time analytics in GCP

Disclaimer: This is NOT a system design tutorial — the purpose of this article is to give a basic overview on how the mentioned tools can be connected and used to facilitate real-time analytics. (I am not a system design expert as well).

The tutorial assumes basic knowledge of Kafka and Docker

All code is available in my Github repo.

Let’s connect!

Introduction

In today’s fast-paced world, businesses are constantly looking for ways to gain a competitive edge by leveraging real-time data. This is especially true in the e-commerce industry, where having access to timely and accurate data can help businesses make informed decisions and improve the customer experience. This is where event-driven architecture can play a crucial role.

In this article, we will explore how to create an event-driven shopping app using Python, Kafka, and BigQuery. We will look at how to set up a scalable end-to-end solution that can capture data in real-time and generate insightful analytics and visualizations. Our goal is to provide a blueprint for businesses that want to use real-time data to drive growth and stay ahead of the competition. So, whether you’re a data engineer, analyst, or business leader, this article is for you.

The Design

We are going to build a backend solution for the following design:

Proposed Design

Consider that we have an online shopping app that is generating new orders from its customers frequently.

Our goal is to be able to capture this orders and use Apache Kafka to trigger events based on the data insertion.

  • Each order will send data to a producer in Kafka.
  • A consumer will read each order and confirm or deny the transaction based on the user credit card status. Another producer will then send the transaction confirmation back to Kafka
  • Then we can trigger two events based on the succeeded transactions: send email to the user confirming the order; send the record to BigQuery for further analytics.
  • Once the data is in BigQuery, we can create views and pipe them into Google Looker Studio for quick insights, or to another BI Tool of your preference.

We will use python to interact with Kafka, and Docker containers to host our Kafka processes.

Docker-compose file

docker-compose.yml

version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181

kafka:
image: confluentinc/cp-kafka:5.3.1
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

This is a docker-compose file that sets up two services: zookeeper and kafka. The docker-compose file uses version 2 of the Compose file format.

zookeeper service:

  • Uses the confluentinc/cp-zookeeper:latest image.
  • Sets two environment variables: ZOOKEEPER_CLIENT_PORT to 2181 and ZOOKEEPER_TICK_TIME to 2000.
  • Maps the host’s port 22181 to the container’s port 2181.

kafka service:

  • Uses the confluentinc/cp-kafka:5.3.1 image.
  • Depends on the zookeeper service.
  • Maps the host’s port 29092 to the container’s port 29092.
  • Sets several environment variables:
  • KAFKA_BROKER_ID to 1.
  • KAFKA_ZOOKEEPER_CONNECT to zookeeper:2181.
  • KAFKA_ADVERTISED_LISTENERS to PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092.
  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP to PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT.
  • KAFKA_INTER_BROKER_LISTENER_NAME to PLAINTEXT.
  • KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR to 1.

This docker-compose file sets up a basic Apache Kafka cluster with a single broker and a single instance of Apache Zookeeper. The zookeeper service acts as the coordination and discovery service for the Kafka cluster. The kafka service is the Kafka broker that will host the topics.

Shopping Order Backend

This is the first component of our system and is responsible for simulate shopping orders from an shopping app. We will generate orders for each 5 seconds.

Let’s explore the python code.

shopping_order_backend.py

import json
import random
import time
from kafka import KafkaProducer

#kafka topic
SHOPPING_ORDER_KAFKA_TOPIC = "shopping_order_details"

ORDER_LIMIT = 20_000

# write to kafka
producer = KafkaProducer(
# server where kafka is running
bootstrap_servers="localhost:29092"
)

print("Generating new shopping order after 5 seconds")

# List of country names
country_names = ["USA", "Canada", "Mexico", "Brazil", "Argentina",
"Colombia", "Peru", "Venezuela", "Chile", "Ecuador"]

# List of item IDs
item_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# Counter for unique IDs
user_id_counter = 1
order_id_counter = 1

while True:
# Generate dictionary
order = {
"order_id": order_id_counter,
"user_id": user_id_counter,
"country_name": random.choice(country_names),
"item_id": random.choice(item_ids),
"item_price": random.uniform(10, 100),
"email": f"{@email.com">user_id_counter}@email.com",
"card_status": random.choice(["approved", "denied"])
}

# send data to producer
producer.send(
SHOPPING_ORDER_KAFKA_TOPIC,
#encode the data
json.dumps(order).encode("utf-8")
)
print(order)
print(f"Done sending order from user_id: {user_id_counter}")
# Increase counters
user_id_counter += 1
order_id_counter += 1

time.sleep(5)

The code generates a random shopping order data and sends it to a Kafka topic named “shopping_order_details”. The code uses the “kafka” library to interact with Apache Kafka.

It sets up a Kafka producer by initializing a KafkaProducer instance with bootstrap_servers parameter set to “localhost:29092”.

The data consists of several fields such as order_id, user_id, country_name, item_id, item_price, email and card_status. The values for the fields are randomly generated from a predefined list of country names and item IDs. The order data is first serialized to a JSON string using the “json.dumps” function, and then encoded to bytes using the “utf-8” encoding.

A while loop runs indefinitely and generates new order data every 5 seconds, which is sent to the Kafka topic using the “send” method. The generated order data is also printed to the console. The unique ids of user and order are incremented after each iteration of the loop.

Shopping Transaction

Next step is to create a transaction from an order. For this, we will read the stored order data to accept or decline a transaction based on the card status.

shopping_transaction.py

#read and write back to kafka
import json
from kafka import KafkaConsumer, KafkaProducer

SHOPPING_ORDER_KAFKA_TOPIC = "shopping_order_details"
SHOPPING_ORDER_CONFIRMED_KAFKA_TOPIC = "shopping_order_confirmed"

consumer = KafkaConsumer(

SHOPPING_ORDER_KAFKA_TOPIC,
bootstrap_servers = "localhost:29092"

)

producer = KafkaProducer(
bootstrap_servers = "localhost:29092"
)

print("Shopping Transactions Listening..")
while True:
for message in consumer:
print("Ongoing Shopping transaction.. ")
consumed_message = json.loads(message.value.decode())
print(consumed_message)

card_status = consumed_message["card_status"]
# First check if the transaction was successfuly completed based on card status
if card_status == "approved":

# send confirmation to topic
data = consumed_message
print("Successful transaction, writing to topic")
producer.send(SHOPPING_ORDER_CONFIRMED_KAFKA_TOPIC,
json.dumps(data).encode("utf-8")
)
else:
print("Order Denied, not writing to topic")

It listens to the “shopping_order_details” topic and processes the messages (shopping transactions) received from that topic. If the “card_status” field of the consumed message is “approved”, it writes the entire message back to another topic “shopping_order_confirmed”. If the “card_status” is “denied”, it does not write anything to the topic. The bootstrap server for both the consumer and the producer is “localhost:29092”. The consumed messages are deserialized and printed to the console.

Now, we are able to run other two processes based on the transactions: analytics and send email to user.

Shopping email

This is a very simple code just for illustration purposes. The code listens to the ”shopping_order_confirmed” topic and retrieves the user email and the order id for each specific message. Then it prints a message indicating that an email is being sent for order confirmation.

Actual email code is not implemented.

shopping_email.py

from kafka import KafkaConsumer
import json

SHOPPING_ORDER_CONFIRMED_KAFKA_TOPIC = "shopping_order_confirmed"

consumer = KafkaConsumer(
SHOPPING_ORDER_CONFIRMED_KAFKA_TOPIC,
bootstrap_servers="localhost:29092"
)

print("Email is listening..")

while True:
for message in consumer:
consumed_message = json.loads(message.value.decode())
customer_email = consumed_message["email"]
order_id = consumed_message["order_id"]
print(f"Sending email to {customer_email}, referring to order {order_id}")

Shopping Analytics

Finally, we are able to produce some insights using the data.

This code, just like the email, listens to a confirmed transactions topic in a Apache Kafka stream. Then it calculates the total number of orders and revenue, and inserts each confirmed transaction data into a table in Google BigQuery for analytics.

For this step, you will need to have a Google BQ project and table created, and a service account generated to be able to authenticate into your environment. Otherwise you can just modify the code to adapt to your use case.

import json
from google.cloud import bigquery
from kafka import KafkaConsumer
import os
import pandas as pd
from pandas_gbq import gbq
# Service account JSON key file path
key_file_path = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")

# Create BigQuery client
client = bigquery.Client.from_service_account_json(key_file_path)


# BigQuery dataset and table name
project_id = "data-eng-bootcamp-375819"
dataset_id = "orders_dataset"
table_id = "orders_table"


SHOPPING_ORDER_CONFIRMED_KAFKA_TOPIC = "shopping_order_confirmed"

consumer = KafkaConsumer(
SHOPPING_ORDER_CONFIRMED_KAFKA_TOPIC,
bootstrap_servers="localhost:29092"
)

total_orders_count = 0
total_revenue = 0

print("Listening to confirmed transactions..")
while True:
for message in consumer:

print("Updating analytics..")

consumed_message = json.loads(message.value.decode())

total_cost = float(consumed_message["item_price"])
total_orders_count += 1
total_revenue += total_cost

print(f"Orders so far today: {total_orders_count}")
print(f"Revenue so far today: {total_revenue}")
print(f"Sending data to table in BigQuery")

# Insert data into the table
# Convert data to a Pandas DataFrame
df = pd.DataFrame([consumed_message], columns=consumed_message.keys())
df = df.astype({"order_id": int, "user_id": int, "country_name": str, "item_id": str, "item_price": float, "email": str, "card_status": str})
gbq.to_gbq(df, f"{dataset_id}.{table_id}", project_id=project_id, if_exists="append")

print("Order successfully sent to table in BigQuery!")

Putting all together

Now that we have the necessary code, let’s go ahead and run the project.

  1. Start the docker containers

Open a terminal and run docker-compose up -d

That’s all we need to start our environment. At this point your Kafka instance will be up and running.

2. Now we can run each python script separately. I will run shopping_order_backend.py as the last script, since I want to start generating data after I have all other consumers listening to the topic.

I’m running all codes in different terminals and put it side by side, so we can see everything happening at the same time.

1. python shopping_transactions.py

2. python shopping_email.py

3. python shopping_analytics.py

We can see that all of the consumers are waiting for some data to arrive, and this data will be generated by the shopping_order_backend.py script. As soon as it starts generating data, the pipeline will start to read and process it.

4. shopping_order_backend.py

So we can notice what is happening here.

The top right terminal is generating order data for each 5 seconds. Then at the top left terminal, it is accepting or denying the transaction based on the card_status for the order. All approved transactions are being stored.

Then the email and the analytics codes are getting the approved transactions and sending email to the user, and saving the data to BigQuery respectively.

Let’s look at each output individually:

shopping_order_backend.py generates orders and save it on the topic

python shopping_transactions.py consumes the order message, validates each transaction and send the approved ones to the topic.

python shopping_email.py consumes the transaction message and send email to the users.

python shopping_analytics.py consumes the transaction, calculates some simple operations and save each message as a row in a BigQuery table.

Analyzing the data

Once our processes run for a while, we will have a BigQuery table that is being constantly updated:

On top of this table, we can create some views to answer some specific business questions, for example:

  • What are the most sold products per country?
  • Which product brings the highest revenue?

We can then create the views on top of this table using the following.

If you don’t know how to create tables in BigQuery, refer to this documentation.

CREATE VIEW `orders_dataset.top_products_per_country` AS (
SELECT country_name, item_id, COUNT(item_id) as frequency
FROM `orders_dataset.orders_table`
GROUP BY country_name, item_id
ORDER BY frequency DESC
);

CREATE VIEW `orders_dataset.most_revenue` AS (
SELECT item_id, SUM(item_price) as total_price
FROM `orders_dataset.orders_table`
GROUP BY item_id
ORDER BY SUM(item_price) DESC
);

We will then end up with the views created in our project:

Now, we can just pipe those views to whichever BI Tool our process needs. In this case, let’s just pipe it into Google Looker Studio since it’s already integrated to BigQuery.

Just export the view to Looker Studio:

Based on the views, we can then create the necessary visualization for our business problem.

Note that, once we refresh the dashboard, the visualization will update based on the new rows that we have in the table, that were added by our Kafka streaming process, enabling analytics in real time:

First Snapshot:

Second Snapshot:

And we can notice the same for the other view:

First Snapshot:

Second Snapshot:

Conclusion

In conclusion, the article has demonstrated the implementation of an event-driven shopping app using Python, Kafka, and BigQuery. By utilizing Kafka as the messaging broker, we were able to capture real-time events related to shopping transactions and stream it to BigQuery. This setup provides a scalable solution that enables the collection of massive amounts of data in real-time, providing immediate insights and analytics. Additionally, by integrating with Google Cloud Platform, the infrastructure and processing power required to handle the data volume can easily be scaled, ensuring the reliability and performance of the system. With the event-driven architecture and the powerful combination of Python, Kafka, and BigQuery, organizations can now harness the power of real-time data to make informed decisions and drive growth.

Thank you for reading!

Reference

Code with Irtiza Youtube Channel (Check out this channel, it’s amazing!)

--

--

Joao Paulo Alvim

Hello! I'm a Data Engineer and a passionate about data world. I love to follow and contribute to the data community trends, discussions and events!