Build a real-time data analytics pipeline with Airbyte, Kafka, and Pinot

Learn how to use Airbyte, Kafka, and Pinot to build a data pipeline for a user-facing analytics dashboard.

Photo by 苏 静斋 on Unsplash

This post was originally published at Airbyte.

Typically, a real-time analytics pipeline consists of several components, including ELT pipelines, an event streaming platform, and an analytical database that can answer queries at scale. However, building and maintaining such a platform is expensive and demands significant engineering effort and time.

Having open-source data engineering and analytics tools at your disposal helps reduce the cost of running a real-time analytics pipeline. Having access to the source code avoids vendor-locking, giving you the flexibility to customize the solution to tailor your organizational needs. This article explores three popular open-source products in the data space, Airbyte, Apache Kafka, and Apache Pinot, to build a user-facing e-commerce dashboard that updates in real-time.

Airbyte is an open-source data integration platform capable of moving data from OLTP databases such as MySQL to destinations such as Apache Kafka using change data capture (CDC) with low latency. Apache Pinot is an open-source OLAP database capable of ingesting streaming data from Kafka and making it available for querying within seconds.

Real-time analytics pipeline architecture

Real-time analytics pipeline architecture

This section discusses the key components of the solution, along with the rationale for picking them up for the project.

Why not directly query the operational OLTP database?

We derive the seller dashboard analytics by analyzing e-commerce orders. Currently, orders reside in a MySQL database, making it challenging to run analytical queries. The OLAP queries require aggregating and filtering a large batch of records to generate metrics, resulting in performance degradation in MySQL, which is not designed to handle such queries.

So we need to extract the orders from MySQL and move them to Apache Pinot for further analysis. We use Airbyte for that.

CDC pipeline to move orders from MySQL to Kafka

Airbyte’s MySQL CDC source extracts the orders in MySQL using Change Data Capture (CDC).

A relational database like MySQL maintains a transaction log to record every state-changing operation such as inserts, updates, and deletes. CDC mechanism tails this transaction log to detect the database entities that have been changed and streams them as change events. That way, we can obtain incrementally updated orders to avoid costly full table extractions.

Airbyte runs this extraction at a scheduled interval, for example, every hour, day, week, or so. Let’s schedule it for every five minutes to get more fresh data. Extracted orders are written to a Kafka topic (orders) as JSON formatted events, streamed to Pinot from there.

Apache Pinot for fast OLAP querying on streaming data

The Airbyte ELT pipeline running every five minutes generates a massive amount of raw data to be analyzed. Also, the analyzed data will be exposed to all the sellers in the platform, forcing us to deal with a high query throughput and a latency range of milliseconds. Hence, the analytics engine must be capable of running analytical queries and returning results in real-time to ensure a good user experience. Therefore, we will use Apache Pinot as the analytics engine to satisfy these needs.

I hope now you have a solid understanding of what we will build next. You can either follow along with the article or have a quick look at the finished solution in this GitHub repo.

Step 1: Prerequisites

The article assumes you have Docker Compose installed on your machine. For better performance, it is recommended to have at least 8GB of RAM and adequate disk space. Clone the following GitHub repository to your local machine and navigate to the project folder.

git clone https://github.com/dunithd/edu-samples
cd edu-samples/airbyte-pinot

Setup Apache Kafka and Apache Pinot

Next, we will create a single node Kafka cluster and a multi-node Apache Pinot cluster with Docker Compose. The cloned project contains a docker-compose.yml file. Launch the Docker stack by running:

docker-compose up

We will revisit this setup in the coming sections.

Setup MySQL

Once the Docker stack runs, let’s create a MySQL database to load some mock e-commerce orders. The location of the database doesn’t matter; it could be either a hosted MySQL instance or a local installation. For this article, let’s use a local installation.

Create the ecommerce database, orders table, and insert mock data

Connect to your MySQL instance via MySQL CLI or using a GUI tool. Execute the following script to create the ecommerce database orders table, and insert some mock orders. You can find this script inside the mysql directory of the accompanying GitHub repository.

mysql -u {username} -p < airbyte-pinot/mysql/ecommerce-schema.sql

Once the script is completed, you can check the content inside the orders table by running:

select * from orders;

Create a dedicated user with access to the orders table

It is always recommended to grant scoped permissions to Airbyte for accessing MySQL. We can do this by creating a dedicated MySQL user with the necessary privileges.To create a dedicated database user, run the following commands against your database.

CREATE USER 'airbyte'@'%' IDENTIFIED BY '<password>';

The required permissions depend on the replication method. While the STANDARD replication method only requires SELECT permissions, CDC replication requires SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT permissions.

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'airbyte'@'%';

Now, our ecommerce database is ready to be used with Airbyte.

Setup Airbyte

We will run Airbyte on your local machine as a separate Docker Compose project. You can follow these instructions to get it up and running.

Step 2: Sync data from MySQL to Kafka with Airbyte

Once Airbyte is running, we need to instruct Airbyte on where to read data (source), move data (destination), and create a connection. That can be done using the Airbyte UI, which runs on localhost:8000.

Set up the MySQL CDC source

Log into Airbyte UI, choose Sources > new source, and select MySQL as the type. Make sure to select CDC as the replication method. We are not going to use SSH for this example. But, an SSH tunnel is recommended when using a public internet network.

Provide the following values in the UI. I’m connecting to my local MySQL installation, which runs in the default port. Feel free to adjust the values based on your environment.

MySQL source settings

Set up the Kafka destination

Next, we will set up a Kafka destination in Airbyte to stream MySQL orders data in near real-time (minutes instead of seconds). Here, we are connecting to the single node Kafka instance we started earlier. First, we will create a Kafka topic named ‘orders’ which will be used to write the CDC data. Navigate to the cloned project, then run the following command from the root level.

docker-compose exec kafka /opt/kafka/bin/kafka-topics.sh 
--bootstrap-server localhost:9092 --create -topic orders

In the Airbyte UI, select Destinations > new destination, and Kafka as the type. Most of the values can be set to defaults, except for the Topic Pattern and the Bootstrap Servers.

Kafka destination settings

Create a MySQL CDC to Kafka pipeline

Once the source and destination are set up, you can create a connection from MySQL to Kafka in Airbyte to create a data pipeline between the two. In the “select the data you want to sync” section, choose the orders table and select Incremental under Sync mode.

Connection settings

Currently, you can set the sync frequency as low as five minutes. If you need a lower frequency you can trigger the Airbyte syncs from the API or integrate it to a workflow management tool like Airflow, Prefect or Dagster. Once configured, you can see the connection in the Connections tab.

Verify the data in Kafka

Once the sync job runs for the first time, it syncs all the orders in MySQL to Kafka. You can run the following command to examine the content inside the orders topic.

docker-compose exec kafka /opt/kafka/bin/kafka-console-consumer.sh 
--bootstrap-server localhost:9092 --topic orders --from-beginning

You will see JSON formatted records coming out of Kafka like this.

{
"_airbyte_ab_id":"c3441ff2-fa23-49da-a7a2-82c7c4ecae4f",
"_airbyte_stream":"orders",
"_airbyte_emitted_at":1646400487271,
"_airbyte_data":{
"id":1,
"store_id":100,
"order_date":"2021-08-15",
"channel":"STORE",
"country":"Hungary",
"total":173.0399932861328,
"status":"ACTIVE",
"_ab_cdc_updated_at":"1970-01-01T00:00:00Z",
"_ab_cdc_log_file":"binlog.000021",
"_ab_cdc_log_pos":156,
"_ab_cdc_deleted_at":null
}
}

Starting from the first job, Airbyte repeatedly runs a sync job every five minutes to sync new or updated orders from MySQL to Kafka.

Step 3: Sync data from Kafka to Pinot

Now that we have our orders showing up in Kafka. Next, we will ingest them into Apache Pinot so that the dashboard can run analytical queries.

Pinot is a distributed system made of different components responsible for data ingestion, data storage, and query brokering. Pinot also depends on Zookeeper for metadata storage and cluster coordination.

If you remember, we started Kafka, Zookeeper, and the rest of the Pinot components as Docker containers in the prerequisites. That simplifies many things for us. However, you can follow this guide if you want to set up a Pinot cluster manually.

Create a schema and a table for orders

Before ingesting the incoming stream of orders, Pinot requires you to define a structure for the stream beforehand. That enables Pinot to optimize its storage and indexing strategies to provide faster data analytics.

We achieve this by creating a schema and a table for the orders data. A schema provides a logical abstraction for the underlying data, declaring attributes, data types, and other constraints. A table implements a schema, specifying concrete information on data ingestion, indexing, and storage requirements. You can think of a schema as a template for a table.

Below is the schema for the orders table, which you can find inside the config folder of the GitHub repo. Schema attributes are partitioned into three sections, dimensions, metrics, and datetime fields. Aggregations are performed on metric fields such as counts, totals, and averages while temporal filtering and sorting are performed on datetime fields. The rest of the attributes fall under dimensions.

{
"schemaName": "orders",
"primaryKeyColumns": [
"id"
],
"dimensionFieldSpecs": [
{
"name": "id",
"dataType": "INT"
},
{
"name": "store_id",
"dataType": "INT"
},
{
"name": "channel",
"dataType": "STRING"
},
{
"name": "country",
"dataType": "STRING"
},
{
"name": "status",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "total",
"dataType": "FLOAT"
}
],
"dateTimeFieldSpecs": [{
"name": "order_date",
"dataType": "STRING",
"format" : "1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd",
"granularity": "1:DAYS"
}]
}

The following is the table definition, which is a REALTIME table. The streamConfigs configuration block specifies the Kafka and Zookeeper settings required for real-time data ingestion.

{
"tableName": "orders",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "order_date",
"schemaName": "orders",
"replication": "1",
"replicasPerPartition": "1"
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
},
"transformConfigs": [
{"columnName": "id", "transformFunction": "JSONPATHLONG(_airbyte_data, '$.id')" },
{"columnName": "store_id", "transformFunction": "JSONPATHLONG(_airbyte_data, '$.store_id')" },
{"columnName": "channel", "transformFunction": "JSONPATHSTRING(_airbyte_data, '$.channel')" },
{"columnName": "country", "transformFunction": "JSONPATHSTRING(_airbyte_data, '$.country')" },
{"columnName": "total", "transformFunction": "JSONPATHDOUBLE(_airbyte_data, '$.total')" },
{"columnName": "status", "transformFunction": "JSONPATHSTRING(_airbyte_data, '$.status')" },
{"columnName": "order_date", "transformFunction": "JSONPATHSTRING(_airbyte_data, '$.order_date')" }
]
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.topic.name": "orders",
"stream.kafka.broker.list": "kafka:9093",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
}
},
"tenants": {},
"metadata": {},
"routing": {
"instanceSelectorType": "strictReplicaGroup"
},
"upsertConfig": {
"mode": "FULL"
}
}

Transform ingested data

The sync records written to Kafka are formatted in JSON according to Airbyte specification. Therefore, during the ingestion, each Kafka event is normalized and mapped to the corresponding schema attribute. That happens inside the transformConfigs block.

Handle duplicate records with upserts

In reality, an order can transition across many life cycle stages such as OPEN, PROCESSING, IN_TRANSIT, and CANCELLED. We have defined the status field to capture that in MySQL. Order status changes are captured by Airbyte and published to Kafka, ultimately ending up in Pinot as duplicated order entries. But, we know that they belong to the same order.

To avoid that, we can enable upserts in the orders table, a feature that allows merging together the data records bearing the same primary key. We have already defined id as the primary in the orders schema. The following configuration block in the above table definition defines a FULL upsert on the orders table, overwriting the old order entirely with the latest arriving order record.

"upsertConfig": {
"mode": "FULL"
}

Finally, run the following command to create the orders schema and table inside Pinot.

docker-compose exec pinot-controller bin/pinot-admin.sh AddTable -schemaFile /config/orders_schema.json -tableConfigFile /config/orders_table.json -exec

Verify the Kafka to Pinot ingestion

Pinot starts ingesting from the orders topic right after the command completion and populates the orders table with incoming orders.

Pinot query console with the orders table

We can also verify the upserts feature by updating the status of an order in MySQL.

update orders set status='CANCELLED' where id=10;

Running the following query in Pinot will result in returning an order record with the status field updated as CANCELLED.

select * from orders where id = 10;

Write queries to analyze orders data

Now that we have the orders ingested into Pinot. We can do some ad-hoc SQL querying to find answers to the questions we had in the beginning. Let’s use the integrated Pinot query console for that.

In our example, each order has a store_id field to represent the store it belongs to. When a seller logs into the dashboard, orders can be filtered by his store_id. For now, let’s use 100 as a sample store_id.

This query returns the total sales for the past week.

select sum(total) as total_sales
from orders
where store_id=100 and
ToEpochSeconds(FromDateTime(order_date, 'YYYY-MM-dd')) > ToEpochSeconds(now()- 86400000)

This query returns the contribution of sales channels based on their revenue.

select channel, sum(total) as total_sales
from orders
where store_id=100 and
ToEpochSeconds(FromDateTime(order_date, 'YYYY-MM-dd')) > ToEpochSeconds(now()- 86400000)
group by channel
order by total_sales desc

And this returns the average order value for the past week.

select avg(total) as avg_order_value
from orders
where store_id=100 and
ToEpochSeconds(FromDateTime(order_date, 'YYYY-MM-dd')) > ToEpochSeconds(now()- 86400000)

Step 4: Build a user-facing analytics dashboard

Now that we have done the hardest part of the solution, moving orders from MySQL to Pinot. Once we have data ingested into Pinot, a user-facing analytics dashboard can be built with any front-end technology such as React, Node.JS, or even Python. The dashboard then pulls data from Pinot through REST APIs or appropriate driver interfaces.

Pinot REST API allows you to post any of the SQL queries we executed above as an HTTP POST request. In return, you will get JSON formatted responses. For example, to query total sales:

curl -H "Content-Type: application/json" -X POST -d '{"sql":"select sum(total) as total_sales from orders"}' http://localhost:8000/query/sql

Building the dashboard goes beyond the scope of this article. Therefore, we will look at it in a future article.

Summary

A user-facing analytics dashboard requires running complex OLAP queries on the underlying data set. It is not recommended to run them on operational OLTP databases as it can degrade the performance. Hence, the operational data must be moved to an OLAP database.

In this article, we learned about building a real-time analytics pipeline with Airbyte to move e-commerce orders from MySQL to Kafka. Ingested orders are streamed to Apache Pinot for answering low-latency OLAP queries, coming from a user-facing dashboard. As a result, sellers are presented with real-time store analytics, to be used for informed decision making.

--

--

--

EdU is a place where you can find quality content on event streaming, real-time analytics, and modern data architectures

Recommended from Medium

94% SE, the end of the beginning!

Fiat -> aTokens on Aave, Powered by Transak

🤖 Price Tracking with Telegram Bot

Everything You Need to Know About Amazon Athena

Yurbi - Enterprise Business Intelligence Software

Spring app on Heroku cloud with Continuous deployment

Simple CSS Transitions in 2019

JindoFS Cache-based Acceleration for Machine Learning Training in a Data Lake

Send/Automate Message in Microsoft Teams using Python

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Dunith Dhanushka

Dunith Dhanushka

Editor of Event-driven Utopia(eventdrivenutopia.com). Technologist, Writer, Developer Advocate at StarTree. Event-driven Architecture, DataInMotion

More from Medium

Journey of Transforming and Architecting Data Platforms using Lambda Architecture

Realtime data streaming with Apache Kafka, Apache Pinot, Apache Druid and Apache Superset

Data Virtualization with Trino Part 2

Apache Druid vs Apache Pinot