Streaming CDC data from MySql to Apache Iceberg with Hive Metastore using Apache Flink.

Pranav Sadagopan
Dev Genius
Published in
10 min readJan 15, 2024

--

This project demonstrates Real-Time streaming of CDC data from MySql to Apache Iceberg using Flink SQL Client for faster data analytics and machine learning workloads.

What is Apache Iceberg ?

Apache Iceberg is an open-source framework for organizing large-scale data in data lakes, offering seamless schema evolution and transactional writes. It ensures consistency with snapshot isolation, supports efficient metadata management, and enables time travel for historical data analysis. Iceberg is compatible with various storage systems, providing flexibility for diverse data lake environments.

Tools Used :

  1. Trino: High-performance query engine for distributed data processing.
  2. Apache Flink: Robust stream processing framework for real-time data analytics.
  3. Apache Iceberg: Open-source table format and processing framework for efficient data lake management.
  4. Hive Metastore: Schema management tool ensuring seamless evolution and organization of data.
  5. Minio: Secure object storage solution for reliable data storage in distributed environments.

Setting up the project :

To initialize the setup, ensure that you have Docker and Docker Compose installed on your system.

https://github.com/pranav1699/flink-iceberg-minio-trino

Please clone the repository at https://github.com/pranav1699/flink-iceberg-minio-trino to obtain the code. After cloning, navigate to the repository directory and initiate the Docker containers using the provided Docker Compose file.

git clone https://github.com/pranav1699/flink-iceberg-minio-trino.git
cd flink-iceberg-minio-trino
docker-compose up -d

Source Database and Tables Overview

We are going to use MySql as the source database to steam the CDC data.

These are the tables that are available in the inventory database.

mysql> use inventory ;

Database changed
mysql> show tables ;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses |
| customers |
| geom |
| orders |
| products |
| products_on_hand |
+---------------------+
6 rows in set (0.00 sec)

Here in the demo we are going to use customers and orders tables for data processing and streaming.

mysql> describe customers ;
+------------+--------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+------------+--------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| first_name | varchar(255) | NO | | NULL | |
| last_name | varchar(255) | NO | | NULL | |
| email | varchar(255) | NO | UNI | NULL | |
+------------+--------------+------+-----+---------+----------------+
4 rows in set (0.25 sec)
mysql> describe orders ;
+--------------+---------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+--------------+---------+------+-----+---------+----------------+
| order_number | int(11) | NO | PRI | NULL | auto_increment |
| order_date | date | NO | | NULL | |
| purchaser | int(11) | NO | MUL | NULL | |
| quantity | int(11) | NO | | NULL | |
| product_id | int(11) | NO | MUL | NULL | |
+--------------+---------+------+-----+---------+----------------+
5 rows in set (0.01 sec)

Setting up Flink CDC Connectors for MySQL: Creating Tables for Change Data Capture (CDC)

We are going to use mysql-cdc connector from ververica to capture the changes from mysql. GitHub — ververica/flink-cdc-connectors: CDC Connectors for Apache Flink®

To initiate the Flink SQL client, access the Flink TaskManager container and launch the SQL client from within. Utilize the following Docker command, employing docker exec, and locate the sql-client.sh script within the bin folder:

docker exec -it <flink_taskmanager_container_id> /opt/flink/bin/sql-client.sh

Replace <flink_taskmanager_container_id> with the actual ID or name of your Flink TaskManager container.

Now we are going to create the flink cdc table for both customers and orders table as customers_source and orders_source

CREATE TABLE customers_source (
`id` int NOT NULL,
first_name STRING,
last_name STRING,
email STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'inventory',
'table-name' = 'customers'
);
 CREATE TABLE orders_source (
`order_number` int NOT NULL,
order_date date NOT NULL,
purchaser int NOT NULL,
quantity int NOT NULL,
product_id int,
PRIMARY KEY(`order_number`) NOT ENFORCED
)WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3307',
'username' = 'root',
'password' = '123456',
'database-name' = 'inventory',
'table-name' = 'orders'
);

Creating Flink Table for Iceberg with Hive Metastore and Minio Storage Layer

We will now generate a Flink table for Iceberg, employing Hive Metastore as the catalog and Minio as the storage layer.

we can see the bucket datalake in minio console where we are going to store the data.

Now lets create the iceberg tables for the customers and orders as customers_iceberg and orders_iceberg

create table customers_iceberg
with (
'connector'='iceberg',
'catalog-name'='iceberg_catalog',
'catalog-type'='hive',
'uri'='thrift://192.168.1.8:9083',
'warehouse'='s3a://datalake/warehouse',
'format-version'='2',
's3.endpoint'='http://192.168.1.8:9000',
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO')
LIKE customers_source (EXCLUDING OPTIONS);
create table orders_iceberg
with (
'connector'='iceberg',
'catalog-name'='iceberg_catalog',
'catalog-type'='hive',
'uri'='thrift://192.168.1.8:9083',
'warehouse'='s3a://datalake/warehouse',
'format-version'='2',
's3.endpoint'='http://192.168.1.8:9000',
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO')
LIKE orders_source (EXCLUDING OPTIONS);

Here we dont need to specify the schema as we are using the CREATE TABLE LIKE to create a table with the same schema as source tables.

Initiating Streaming Pipeline for MySQL to Iceberg Sync with Flink

Now we are going to start the streaming pipeline to sync the streaming data changes from MySql to Iceberg.

First we need to set the checkpointing interval.

set 'execution.checkpointing.interval'='3000' ;

we are going to use INSERT INTO statement to UPSERT the streaming changes.

insert into customers_iceberg /*+ OPTIONS('upsert-enabled'='true','format-version'='2') */
select * from customers_source ;
insert into orders_iceberg /*+ OPTIONS('upsert-enabled'='true','format-version'='2') */
select * from orders_source ;

Now these jobs are created and you can see the execution plan in Apache flink dashboard.

Now lets try to join orders_source and customers_source to create a table customer_orders to store the enriched the data.

create table customer_orders (
`order_number` int NOT NULL,
order_date date NOT NULL,
purchaser VARCHAR NOT NULL,
quantity int NOT NULL,
product_id int,
PRIMARY KEY(`order_number`,order_date) NOT ENFORCED
)
PARTITIONED by (order_date)
with (
'connector'='iceberg',
'catalog-name'='iceberg_catalog',
'catalog-type'='hive',
'uri'='thrift://192.168.1.8:9083',
'warehouse'='s3a://datalake/warehouse',
'format-version'='2',
's3.endpoint'='http://192.168.1.8:9000',
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO');

Here are are partitioning the table based on order_date and order_number

Note : In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields.

insert into customer_orders /*+ OPTIONS('upsert-enabled'='true','format-version'='2') */
select o.order_number , o.order_date,
concat(c.first_name,' ',c.last_name) as purchaser, o.quantity, o.product_id
from orders_source o
join customers_source c on o.purchaser = c.id ;

Now we can see the tables that we creates and the data is stored in minio

Querying Iceberg Data with Trino and Performing Inserts/Updates in MySQL

Now lets try to query these data using trino and try to insert and update the record in MySql, we have already created a catalog for iceberg in trino

Now lets query all these customers_iceberg and orders_icerberg tables

trino> select * from iceberg.default_database.customers_iceberg ;
id | first_name | last_name | email
------+------------+-----------+-----------------------
1001 | Sally | Thomas | sally.thomas@acme.com
1002 | George | Bailey | gbailey@foobar.com
1003 | Edward | Walker | ed@walker.com
1004 | Anne | Kretchmar | annek@noanswer.org
(4 rows)

Query 20240113_141611_00000_nrbwf, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
10.69 [4 rows, 1.34KB] [0 rows/s, 129B/s]
trino> select * from iceberg.default_database.orders_iceberg ;
order_number | order_date | purchaser | quantity | product_id
--------------+------------+-----------+----------+------------
10001 | 2016-01-16 | 1001 | 1 | 102
10002 | 2016-01-17 | 1002 | 2 | 105
10003 | 2016-02-19 | 1002 | 2 | 106
10004 | 2016-02-21 | 1003 | 1 | 107
(4 rows)

Query 20240113_142321_00001_nrbwf, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
0.49 [4 rows, 1.49KB] [8 rows/s, 3.01KB/s]

Now lets insert a record in customers and orders table in MySql.

mysql> insert into customers (first_name,last_name,email) values ('pranav','sadagopan','ps@email.com') ;
Query OK, 1 row affected (0.96 sec)
mysql> insert into orders (order_date,purchaser,quantity,product_id ) values ('2023-12-21', 1005, 10, 106) ;
Query OK, 1 row affected (0.04 sec)

Now lets check the iceberg tables to see the inserted data.

trino> select * from iceberg.default_database.customers_iceberg ;

id | first_name | last_name | email
------+------------+-----------+-----------------------
1005 | pranav | sadagopan | ps@email.com
1001 | Sally | Thomas | sally.thomas@acme.com
1002 | George | Bailey | gbailey@foobar.com
1003 | Edward | Walker | ed@walker.com
1004 | Anne | Kretchmar | annek@noanswer.org
(5 rows)

Query 20240113_174442_00006_nrbwf, FINISHED, 1 node
Splits: 2 total, 2 done (100.00%)
1.30 [5 rows, 2.62KB] [3 rows/s, 2.01KB/s]
trino> select * from iceberg.default_database.orders_iceberg ;

order_number | order_date | purchaser | quantity | product_id
--------------+------------+-----------+----------+------------
10005 | 2023-12-21 | 1005 | 10 | 106
10001 | 2016-01-16 | 1001 | 1 | 102
10002 | 2016-01-17 | 1002 | 2 | 105
10003 | 2016-02-19 | 1002 | 2 | 106
10004 | 2016-02-21 | 1003 | 1 | 107
(5 rows)

Query 20240113_164131_00004_nrbwf, FINISHED, 1 node
Splits: 2 total, 2 done (100.00%)
4.45 [5 rows, 2.89KB] [1 rows/s, 665B/s]

Great, It captured the change and It inserted the new records in Iceberg tables.

Now lets try to update the records.

mysql> update customers set email = 'example@email.com' where id = 1005 ;
Query OK, 0 rows affected (0.01 sec)
Rows matched: 1 Changed: 0 Warnings: 0
mysql> update orders set quantity = 100 where purchaser = 1005 ;
Query OK, 1 row affected (0.03 sec)
Rows matched: 1 Changed: 1 Warnings: 0

Now lets check the iceberg tables to see the updated data.

trino> select * from iceberg.default_database.customers_iceberg ;
id | first_name | last_name | email
------+------------+-----------+-----------------------
1005 | pranav | sadagopan | example@email.com
1001 | Sally | Thomas | sally.thomas@acme.com
1002 | George | Bailey | gbailey@foobar.com
1003 | Edward | Walker | ed@walker.com
1004 | Anne | Kretchmar | annek@noanswer.org
(5 rows)

Query 20240113_175058_00007_nrbwf, FINISHED, 1 node
Splits: 3 total, 3 done (100.00%)
0.61 [5 rows, 3.93KB] [8 rows/s, 6.46KB/s]
trino> select * from iceberg.default_database.orders_iceberg ;
order_number | order_date | purchaser | quantity | product_id
--------------+------------+-----------+----------+------------
10005 | 2023-12-21 | 1005 | 100 | 106
10001 | 2016-01-16 | 1001 | 1 | 102
10002 | 2016-01-17 | 1002 | 2 | 105
10003 | 2016-02-19 | 1002 | 2 | 106
10004 | 2016-02-21 | 1003 | 1 | 107
(5 rows)

Query 20240113_175106_00008_nrbwf, FINISHED, 1 node
Splits: 3 total, 3 done (100.00%)
0.47 [5 rows, 4.3KB] [10 rows/s, 9.08KB/s]

Great, It captured the change and It updates records in Iceberg tables.

Now lets see the enriched customers_orders table.

trino> select * from iceberg.default_database.customer_orders ;
order_number | order_date | purchaser | quantity | product_id
--------------+------------+------------------+----------+------------
10002 | 2016-01-17 | George Bailey | 2 | 105
10001 | 2016-01-16 | Sally Thomas | 1 | 102
10003 | 2016-02-19 | George Bailey | 2 | 106
10005 | 2023-12-21 | pranav sadagopan | 100 | 106
10004 | 2016-02-21 | Edward Walker | 1 | 107
(5 rows)

Query 20240113_175336_00010_nrbwf, FINISHED, 1 node
Splits: 7 total, 7 done (100.00%)
0.35 [5 rows, 10.4KB] [14 rows/s, 29.8KB/s]

Exploring Metadata Queries

History :

trino:default_database> select * from "customer_orders$history" ;
made_current_at | snapshot_id | parent_id | is_current_ancestor
-----------------------------+---------------------+---------------------+---------------------
2024-01-13 14:44:53.424 UTC | 965236329886383639 | NULL | true
2024-01-13 14:45:22.907 UTC | 7356488553005171258 | 965236329886383639 | true
2024-01-13 14:45:52.906 UTC | 2822797807621626545 | 7356488553005171258 | true
2024-01-13 14:46:22.907 UTC | 5430267644996856462 | 2822797807621626545 | true
2024-01-13 14:46:52.893 UTC | 3445652634772310842 | 5430267644996856462 | true
2024-01-13 14:47:22.887 UTC | 8930917361927991712 | 3445652634772310842 | trueSnapsho

Snapshot:

trino:default_database> select * from "customer_orders$snapshots" ;
committed_at | snapshot_id | parent_id | operation | manifest>
-----------------------------+---------------------+---------------------+-----------+----------------------------------------------------------------------->
2024-01-13 14:44:53.424 UTC | 965236329886383639 | NULL | overwrite | s3a://datalake/warehouse/default_database.db/customer_orders/metadata/>
2024-01-13 14:45:22.907 UTC | 7356488553005171258 | 965236329886383639 | append | s3a://datalake/warehouse/default_database.db/customer_orders/metadata/>
2024-01-13 14:45:52.906 UTC | 2822797807621626545 | 7356488553005171258 | append | s3a://datalake/warehouse/default_database.db/customer_orders/metadata/>
2024-01-13 14:46:22.907 UTC | 5430267644996856462 | 2822797807621626545 | append | s3a://datalake/warehouse/default_database.db/customer_orders/metadata/>
2024-01-13 14:46:52.893 UTC | 3445652634772310842 | 5430267644996856462 | append | s3a://datalake/warehouse/default_database.db/customer_orders/metadata/>
2024-01-13 14:47:22.887 UTC | 8930917361927991712 | 3445652634772310842 | append | s3a://datalake/warehouse/default_database.db/customer_orders/metadata/>
2024-01-13 14:47:52.875 UTC | 2470895016716335077 | 8930917361927991712 | append | s3a://datalake/warehouse/default_database.db/customer_orders/metadata/>

Conclusion

In wrapping up our journey through real-time data streaming, we combined Flink, MySQL CDC connectors, Iceberg, Minio, Hive Metastore, and Trino for a robust solution. Flink orchestrated a streaming pipeline to sync live data changes from MySQL to Iceberg, creating an efficient storage foundation. With a well-structured Iceberg table backed by Hive Metastore and Minio, our data management became seamless. Trino showcased its powers by smoothly querying Iceberg data, adding a layer of versatility. The Apache Flink dashboard provided insights into the streaming job efficiency. Lastly, our ability to insert and update MySQL records from Trino added interactivity, emphasizing the agility and power of modern data tools for responsive analytics.

Connect with me :

Twitter — https://twitter.com/pranavsadagopan

LinkedIn — https://www.linkedin.com/in/pranav-sadagopan/

--

--