Data Ingestion (Change Data Capture Demonstration)

Oladayo
CodeX
Published in
7 min readMar 23, 2023

Hi Everyone,

Welcome to the last part of the four-part series on data ingestion. You can read the first part here, the second part here and the third part here.

In this post, I will demonstrate the change data capture approach to data ingestion.

a simple diagram that depicts the demonstration architecture

I will be streaming data from a table in the PostgreSQL database to a table in the BigQuery dataset with the help of Kafka and connect clusters. The PostgreSQL, Kafka cluster and connect cluster will run in docker containers.

Requirements

Based on the demonstration architecture, the requirements to enable data ingestion via change data capture are:

  1. A docker desktop application.
  2. a docker-compose file that has the following services:
  • zookeeper: manages the distributed environment and handles configuration across each service in the Kafka cluster.
  • broker: a Kafka server that runs within a Kafka cluster.
  • connect: is used to connect the broker with the source system (PostgreSQL) and the sink system (BigQuery) so data can be easily streamed via the Kafka cluster. The connect service will include the source connector (Debezium PostgreSQL CDC Source Connector) and the sink connector (Google BigQuery Sink Connector).
  • schema-registry: keeps track of the structure of the messages sent and received across the Kafka cluster.
  • control-center: provides a web-based tool for managing and monitoring the Kafka broker.

The first four services (zookeeper, broker, connect and schema-registry) are crucial components of the Kafka system, while the fifth service (control center) is optional.

A place to find the docker-compose file that has all these services is the cp-all-in-one repository, provided by Confluent Inc, on GitHub.

Additional service needed outside the Kafka and connect cluster is:

  • postgresqlsource: provides an instance of the PostgreSQL relational database management system. This will serve as the source system for the data ingestion process. A user (root), password (root), and database (sourcedb) was defined in the environment.

The docker-compose file I will use for data ingestion, which includes the postgresqlsource service, can be found here along with some comments.

3. BigQuery: this will serve as the sink system for the data ingestion process.

Let’s create a dataset named demo in BigQuery for this purpose.

created a dataset named demo. It has no tables in it.

4. Two connector configuration files (one for the source and one for the sink).

The source connector configuration file provides instructions to the source connector on how to connect and read from the source system while the sink connector configuration file provides instructions to the sink connector on how to connect and write to the sink system.

Let’s get to the demonstration:

  1. start the services in the docker-compose file. To do that, I will navigate to the repository that contains the docker-compose file on the terminal and use the docker command below to start up the services.
docker-compose up -d
snapshot showing all the services (containers) are running

2. run a PostgreSQL client within the postgresqlsource container. To do that, I will execute the command below.

 docker exec -it postgresqlsource psql -U root sourcedb
running a PostgreSQL client within the postgresqlsource container

3. Let’s create a table named orders in the sourcedb database with two columns (ordersId and ordersValue) and insert random values.

orders table created and values inserted into the tables

4. Go to localhost:9021/ on your browser. This is the port for the control-center container.

control-center cluster overview
control-center cluster detailed overview

Let’s navigate to the Topics section to see the list of the topics in the Kafka cluster.

default Topics in the Kafka cluster

Let’s navigate to the Connect section and check out the list of connectors in Connect Clusters

connectors in the Connect Clusters

We have the two connectors we need. The PostgresConnector (Source) and the BigQuery (Sink).

5. Let’s upload the connector configuration files.

First the source connector configuration file. The file is a JSON file with the contents.

{
"name": "postgresql-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"plugin.name": "pgoutput",
"database.hostname": "postgresqlsource",
"database.port": "5432",
"database.user": "root",
"database.password": "root",
"database.dbname" : "sourcedb",
"database.server.name": "postgresql",
"topic.prefix": "postgresql"

}
}

You can read about the PostgreSQL Source Connector (Debezium) Configuration Properties

The source connector running
updated Topics showing a new topic named postgresql.public.orders

The Topics list has a new topic named postgresql.public.orders (postgresql is the topic prefix in the source connector configuration file, and orders is the table's name).

Let’s upload the sink connector configuration file. The file is a JSON file with the following contents

{
"name": "bigquery-sink",
"config": {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"tasks.max": "1",
"topics": "postgresql.public.orders",
"sanitizeTopics": true,
"autoCreateTables": true,
"autoUpdateSchemas": true,
"schemaRegistryLocation":"http://schema-registry:8081",
"schemaRetriever": "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",
"project": "dummy-surveillance-project",
"defaultDataset" : "demo",
"keyfile": "/service key.json",
"upsertEnabled": true,
"deleteEnabled": true,
"kafkaKeyFieldName": "orderId",
"transforms": "HoistFieldKey",
"transforms.HoistFieldKey.type": "org.apache.kafka.connect.transforms.HoistField$Key",
"transforms.HoistFieldKey.field": "orderId",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}

You can read about the Google BigQuery Sink Connector Configuration Properties

The sink connector running

6. When the source connector is uploaded and running, it does an initial snapshot of the orders table in the sourcedb database in PostgreSQL and produces the initial snapshot to the postgresql.public.orders topic.

When the sink connector is uploaded and running, it consumes the initial snapshot of the orders table from the topic.

So let’s check the demo dataset in BigQuery for any table, if there is one, let’s query the table.

tables in the demo dataset in BigQuery

Now, we have two tables in the demo dataset but we are only interested in the first table named postgresql_public_orders.

Let’s check for the columns in the table.

the columns in the postresql_public_orders table in BigQuery

We have quite a number of nested columns (column that has columns in them).

The reason for having this kind of data structure is that I used a Debezium source connector.

When using the Debezium source connector, it generates a Debezium envelope which contains the data from the source system as well as additional metadata.

We are only interested in two nested columns;

  1. the before column which has two columns in it (orderid and ordervalue)
  2. the after column which has two columns in it (orderid and ordervalue)

The orderid and ordervalue columns in the after column represent the current state of the two columns in the orders table in the source system database.

If an update is made to data records in the orders table in the source system, the initial state of the data records gets moved to the before column and the new update to the after column.

Let’s query those two columns.

postresql_public_orders table query result

Looking at the query result, the orderid and ordervalue columns in the after column represent the current state of the orderId and orderValue columns in the orders table in the source system database while the orderid and ordervalue columns in the before column are null which means no updates has been made to the data records.

Let’s insert more values to the orders table in the sourcedb database in PostgreSQL.

updated orders table with 3 new records with orderId 6, 7 and 8 added

Let’s query the postgresql_public_orders table in BigQuery

updated postresql_public_orders query result

Looking at the query result, the orderid and ordervalue columns in the after column represent the current state of the orderId and orderValue columns in the orders table in the source system database.

Let’s update the two rows in the orders table in the sourcedb database in PostgreSQL;

updated orders table with update made to orderId 2 and 4

Let’s query the postgresql_public_orders table in BigQuery

updated postresql_public_orders query result

Looking at the query result, the orderid and ordervalue columns in the after column are the current state of the orderId and orderValue columns in the orders table in the source system database while the initial state of data records of orderid 2 and orderid 4 are now in the before column.

Let’s delete two records from the orders table in the sourcedb database in PostgreSQL.

updated orders table with deletion made to orderId 1 and 5 data records

Let’s query the postgresql_public_orders table in BigQuery

updated postresql_public_orders query result

Looking at the query result, there are no longer data records for orderId 1 and orderId 5 in the postgresql_public_orders table. The after column orderid and ordervalue columns are the current state of the orderId and orderValue in the orders table in the source system.

Messages were produced to the topic (postgresql.public.orders) while carrying out the INSERT, UPDATE and DELETE operations. You can view it here.

In closing, I have demonstrated the change data capture approach to data ingestion. This demonstration is a log-based change data capture because I made use of the Debezium Source Connector. There are other connectors that can be used like the JDBC Source Connector for query-based change data capture.

The change data capture approach to data ingestion is scalable for frequent changes to the source system tables.

Thank you to everyone that has followed me through this series.

Until next time, be good everyone.

--

--

Oladayo
CodeX
Writer for

data 📈, space 🚀🛰, augmented reality 👓 and photography 📸 enthusiast.