DataOps 01: Stream data ingestion with Redpanda

Ong Xuan Hong
10 min readFeb 11, 2023

--

In the article What’s Next for Data Engineering in 2023? 7 Predictions , one of the predictions I agree with is Prediction #6: Data warehouses and data lakes use cases start to blur means that we will gradually merge structured data and unstructured data into the same place.

Surely when building Data Platform, you have heard of Kappa Architecture and Lambda Architecture models. In the Lambda architecture, we will divide batch data and stream data into two branches to store, process, and query data to help management, but the Data Engineer’s workload will be increased due to the need to manage two systems at the same time. In contrast, the Kappa architecture aims to consolidate batch and stream together, then use a federated query mechanism to be able to store, process, and query data at the same time.

We still often design and build according to Lambda more than Kappa, although Kappa’s brings more value in terms of speed as well as the ability to integrate real-time microservices. This is because people are afraid to adopt Kappa that we need a very skillful team behind the complex Event Driven Hub infrastructure to administer and keep the system running 24/7.

Kafka is the senior distributed event streaming platform ever. Kafka set the standard for later similar event streaming systems. Many businesses have successfully applied Kafka in operations , from real-time report analysis, and data migration to high-end AI applications. However, when building Kafka from the ground up, you need a DevOps team with very strong technical skills to keep it running smoothly. So are there any other alternatives that have similar powers but are more approachable?

Redpanda is one of those. Not only easier to manage but also the cost of building the platform is cheaper, so the work of system admins is also less difficult. The two main features that make Redpanda 10x faster and 6x lower in cost are the removal of Zookeeper and the Raft algorithm , which is coded in C++, so it doesn’t have to touch the JVM. Redpanda has been tested experimentally with 200 hours of test runs . In addition, Redpanda is fully compatible with the Kafka ecosystem like Kafka connect.

In this article, I will proceed to install Redpanda as a broker to use for ingesting data. You can use the GitHub link here to reproduce: https://github.com/ongxuanhong/de01-stream-ingestion-redpanda-minio

  • The data source will be MySQL which emulates the operating data of the business, specifically the user’s order transaction. Besides, there will be clickstream events that simulate the user interaction process on the e-commerce website.
  • Target sink we can use S3, GCS, or Azure Blob for distributed storage. However, I wanted the article to be easy to install for most readers, without creating an account on the cloud provider, so I decided to use MinIO instead.
  • To transport the source/sink data I would install debezium for MySQL and Kafka connect for MinIO.

1. Data sources

1.1 MySQL — operational data

We use docker-compose.yml to build MySQL service

mysql:
image: debezium/example-mysql:1.6
container_name: mysql
volumes:
- ./mysql/data:/var/lib/mysql
ports:
- "3306:3306"
env_file:
- .env

along with the .env file below

MYSQL_ROOT_PASSWORD="debezium"
MYSQL_USER="admin"
MYSQL_PASSWORD="admin123"

Here, we use a docker image named debezium/example-mysql:1.6 which already has CDC settings so that debezium can access the data synchronously. When you want to install CDC for MySQL, you can refer to this link .

Because I don’t want to waste time typing many commands, I have prepared a Makefile file containing shortcuts of commands to interact with the system.

include .env

build:
docker-compose build

up:
docker-compose --env-file .env up -d

down:
docker-compose --env-file .env down

ps:
docker ps --format "table {{.Names}}\t{{.Status}}\t{{.Ports}}"

to_redpanda:
open http://localhost:8080/topics

to_minio:
open http://localhost:9001/buckets

to_mysql:
docker exec -it mysql mysql -u"root" -p"${MYSQL_ROOT_PASSWORD}" ${MYSQL_DATABASE}

to_data_generator:
docker exec -it data_generator /bin/bash

We proceed to create the database brazillian_ecommerce with table olist_orders_dataset to store order transaction data. All data you can go to Kaggle to register and download. For convenience, I’ve made it available on Github . Next, we execute the command make upto build the MySQL service and make to_mysqlto access MySQL and execute the command lines as below.

mysql> create database brazillian_ecommerce;
Query OK, 1 row affected (0.00 sec)
mysql> use brazillian_ecommerce;
Database changed
mysql> CREATE TABLE olist_orders_dataset (
-> order_id varchar(32),
-> customer_id varchar(32),
-> order_status varchar(16),
-> order_purchase_timestamp varchar(32),
-> order_approved_at varchar(32),
-> order_delivered_carrier_date varchar(32),
-> order_delivered_customer_date varchar(32),
-> order_estimated_delivery_date varchar(32),
-> PRIMARY KEY (order_id)
-> );
Query OK, 0 rows affected (0.01 sec)
mysql> show tables;
+--------------------------------+
| Tables_in_brazillian_ecommerce |
+--------------------------------+
| olist_orders_dataset |
+--------------------------------+
1 row in set (0.00 sec)

Next, we create src/ containing scripts that generate transaction data with the following files/folder structure:

src
├── 01_generate_orders.py
├── 02_generate_clickstream.py
├── data
│ └── olist_orders_dataset.csv
├── Dockerfile
├── requirements.txt
└── setup_connectors.sh
  • 01_generate_orders.py : used to generate transaction data for MySQL.
  • 02_generate_clickstream.py : used to generate clickstream events data.
  • data/olist_orders_dataset.csv: contains transaction data.
  • Dockerfile : used to package all the src/ code and necessary requirements.
  • requirements.txt : contains a list of installed libraries.
  • setup_connectors.sh : contains requests used to create source/sink connections for Kafka connect.

To build a docker image for src/ we add the following declarations to docker-compose.yml

data-generator:
build:
context: ./src
dockerfile: ./Dockerfile
container_name: data_generator
volumes:
- ./src:/opt/src
env_file:
- .env

Then, we use maketo generate transaction data for MySQL

make build
make down
make up
make to_data_generator

When entering the data_generator container. We use python to run the script 01_generate_orders.py . If successful, we will see an output similar to the one below.

(74254, 9)
NO. DATES: 366
Writing data on: 2017-08-01
-Records: 165
Writing data on: 2017-08-02
-Records: 157
Writing data on: 2017-08-03
-Records: 148

We return to MySQL to check the generated data

select * from olist_orders_dataset limit 10;

2. Ingestion layer

2.1 Redpanda — fast storage for real-time data streaming

Once we have CDC data, we continue to install Redpanda to read this streaming event. We continue to add declarations like to docker-compose.yml

redpanda:
image: vectorized/redpanda
container_name: redpanda
ports:
- "9092:9092"
- "29092:29092"
command:
- redpanda
- start
- --overprovisioned
- --smp
- "1"
- --memory
- "1G"
- --reserve-memory
- "0M"
- --node-id
- "0"
- --kafka-addr
- PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr
- PLAINTEXT://redpanda:29092,OUTSIDE://redpanda:9092
- --check=false

redpanda-console:
image: vectorized/console
container_name: redpanda_console
depends_on:
- redpanda
ports:
- "8080:8080"
env_file:
- .env

We restart the services by make down && make up. When successful, we access http://localhost:8080/topics and we will see the redpanda-console interface below

Redpanda UI

2.2 Kafka connect — transfer data from source to sink

We can completely self-code from scratch processes such as creating topics on Redpanda to contain streaming events, writing scripts to read data from this topic, and saving it on MinIO. Instead, we have Kafka connect with a lot of connectors that support connecting from source to sink, saving us the time of complicated installation, by just focusing on setting up and running the job.

The default Docker image of Kafka connect will not be there io.confluent.connect.s3.S3SinkConnector, so we need to create a folder kafka/ with Dockerfile to download this connector for Kafka connect.

FROM debezium/connect

RUN curl -O https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-s3/versions/10.3.1/confluentinc-kafka-connect-s3-10.3.1.zip \
&& unzip confluentinc-kafka-connect-s3-10.3.1.zip \
&& mv confluentinc-kafka-connect-s3-10.3.1 /kafka/connect/ \
&& rm confluentinc-kafka-connect-s3-10.3.1.zip

We add docker-compose.yml with declarations like below

kafka-connect:
build:
context: ./kafka
dockerfile: ./Dockerfile
container_name: kafka_connect
depends_on:
- redpanda
ports:
- "8083:8083"
env_file:
- .env

After make build && make down && make upwe test the Kafka connect service by requesting tolocalhost:8083/connector-plugins/

# health check Kafka connect
curl -H "Accept:application/json" localhost:8083/connector-plugins/
[
{
"class": "io.confluent.connect.s3.S3SinkConnector",
"type": "sink",
"version": "10.3.1"
},
{
"class": "io.confluent.connect.storage.tools.SchemaSourceConnector",
"type": "source",
"version": "3.3.1"
},
{
"class": "io.debezium.connector.db2.Db2Connector",
"type": "source",
"version": "2.0.1.Final"
},
{
"class": "io.debezium.connector.mongodb.MongoDbConnector",
"type": "source",
"version": "2.0.1.Final"
},
{
"class": "io.debezium.connector.mysql.MySqlConnector",
"type": "source",
"version": "2.0.1.Final"
},
{
"class": "io.debezium.connector.oracle.OracleConnector",
"type": "source",
"version": "2.0.1.Final"
},
{
"class": "io.debezium.connector.postgresql.PostgresConnector",
"type": "source",
"version": "2.0.1.Final"
},
{
"class": "io.debezium.connector.sqlserver.SqlServerConnector",
"type": "source",
"version": "2.0.1.Final"
},
{
"class": "io.debezium.connector.vitess.VitessConnector",
"type": "source",
"version": "2.0.1.Final"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "3.3.1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "3.3.1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "3.3.1"
}
]

2.3 Debezium — read Change Data Capture from MySQL

To read the data out, we just need to create a connector for Kafka connect. This connector is named io.debezium.connector.mysql.MySqlConnector. Detailed setup information can be found here .

curl --request POST \
--url http://localhost:8083/connectors \
--header 'Content-Type: application/json' \
--data '{
"name": "src-brazillian-ecommerce",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.include.list": "brazillian_ecommerce",
"topic.prefix": "dbserver1",
"schema.history.internal.kafka.bootstrap.servers": "redpanda:9092",
"schema.history.internal.kafka.topic": "schema-changes.brazillian_ecommerce"
}
}'

When the request is successful, we return to the Redpanda console and we will see that debezium-related topics have been created automatically.

Checking the topic dbserver1.brazillian_ecommerce.olist_orders_datasetwe can see the entire content of each CDC message. Based on the collected content, we can replicate to many different target sinks such as Database PostgreSQL, Data warehouse Redshift or Data lakehouse.

Contents of the key
Contents of value

3. Target sink

3.1 MinIO — data lake for distributed data storage

Finally, the destination of the data will be MinIO. We add the declarations for MinIO as below to docker-compose.yml

minio:
hostname: minio
image: "minio/minio"
container_name: minio
ports:
- "9001:9001"
- "9000:9000"
command: [ "server", "/data", "--console-address", ":9001" ]
volumes:
- ./minio/data:/data
env_file:
- .env

mc:
image: minio/mc
container_name: mc
hostname: mc
environment:
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=minio123
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 minio minio123) do echo '...waiting...' && sleep 1; done; /usr/bin/mc mb minio/warehouse; /usr/bin/mc policy set public minio/warehouse; exit 0; "
depends_on:
- minio

along with the environment variables added to the .env file

# MinIO
MINIO_ROOT_USER="minio"
MINIO_ROOT_PASSWORD="minio123"
MINIO_ACCESS_KEY="minio"
MINIO_SECRET_KEY="minio123"

After make down && make up. We access http://localhost:9001/buckets and we will see the interface presented as shown below

MinIO UI

3.2 Sink CDC data into MinIO

We use Kafka connect to sync data directly to MinIO. Detailed setup information of io.confluent.connect.s3.S3SinkConnector, you can find here .

curl --request POST \
--url http://localhost:8083/connectors \
--header 'Content-Type: application/json' \
--data '{
"name": "sink-s3-brazillian-ecommerce",
"config": {
"topics.regex": "dbserver1.brazillian_ecommerce.*",
"topics.dir": "brazillian_ecommerce",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"flush.size": "1000",
"store.url": "http://minio:9000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.region": "us-east-1",
"s3.bucket.name": "warehouse",
"aws.access.key.id": "minio",
"aws.secret.access.key": "minio123"
}
}'

Check the information on the Redpanda console, we will see a new consumer group is created

When the data synchronization is complete, we will see the synced files in MinIO

Folder to store topics of brazillian_ecommerce
The data is saved as json

4. Same for clickstream data

4.1 Clickstream — user event data

We do the same by accessing the data_generator container via make to_data_generator. Then use python to run the script 02_generate_clickstream.pywhen successful we will see the log as below.

root@e478fba851b1:/opt/src# python 02_generate_clickstream.py
0.129.2. Sent data to Redpanda topic clickstream_events: b'9272c9373449fa586ea14f425b5497a7' - b'{"timestamp": "2017-08-01", "event_name": "video_play", "event_value": 0}', sleeping for 1 second
0.130.0. Sent data to Redpanda topic clickstream_events: b'bef3b7e0d09c81ece65cc174475bb2f8' - b'{"timestamp": "2017-08-01", "event_name": "link_2_click", "event_value": 1}', sleeping for 1 second
0.130.1. Sent data to Redpanda topic clickstream_events: b'bef3b7e0d09c81ece65cc174475bb2f8' - b'{"timestamp": "2017-08-01", "event_name": "pdf_download", "event_value": 0}', sleeping for 1 second
.
.
.
.
2.147.0. Sent data to Redpanda topic clickstream_events: b'97b02cd154ba83c7a105d797003cbff4' - b'{"timestamp": "2017-08-03", "event_name": "link_1_click", "event_value": 1}', sleeping for 3 second
2.147.1. Sent data to Redpanda topic clickstream_events: b'97b02cd154ba83c7a105d797003cbff4' - b'{"timestamp": "2017-08-03", "event_name": "link_2_click", "event_value": 0}', sleeping for 3 second
2.147.2. Sent data to Redpanda topic clickstream_events: b'97b02cd154ba83c7a105d797003cbff4' - b'{"timestamp": "2017-08-03", "event_name": "link_2_click", "event_value": 1}', sleeping for 3 second

At this point, when checking with the Redpanda console, we will see that a topic has clickstream_eventsbeen created, along with events captured on the system.

Finally, to sink data about MinIO, register a connector on Kafka connect by requesting as below

curl --request POST \
--url http://localhost:8083/connectors \
--header 'Content-Type: application/json' \
--data '{
"name": "sink-s3-clickstream",
"config": {
"topics": "clickstream_events",
"topics.dir": "clickstream_events",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"key.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"s3.compression.type": "gzip",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"flush.size": "100",
"store.url": "http://minio:9000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.region": "us-east-1",
"s3.bucket.name": "warehouse",
"aws.access.key.id": "minio",
"aws.secret.access.key": "minio123"
}
}'

Yes, finally clickstream data has been synced to MinIO

Conclusion

Migrating data from one system to another will be very time-consuming and cause stagnation for the original system if we go in the direction of batch ingestion. Because we need to make a flood of requests to the database to get new data. If 1–2 tables are not significant, in fact, we need more tables to be able to do report analysis or serve Machine learning models.

Stream data ingestion has more advantages such as reducing the load on the source system because it only reads log data from steam, thanks to the changes tracked through CDC, we can completely rely on this source-of-truth to reproduce and recreate all data in many other systems.

The process of data migration is just the first step in the process of bringing data to all data consumers of Data Engineer. The remaining work is how can it be reconstructed from the CDC so that users can see the complete data at the end. In the following articles, I will tell about the two main algorithms commonly used to read this CDC data, which are Copy on write and Merge on read .

References

--

--