Real-time Data Streaming: From PostgreSQL to Elasticsearch via Kafka and Debezium

Cagataygokcel
6 min readAug 22, 2023

--

In the fast-paced realm of modern business, effective data management is pivotal. At Trendyol, we faced a critical challenge that shed light on data inconsistencies while using Couchbase. This was especially pronounced when meeting our clients’ intricate filtering demands. Seeking a solution, Team PLM embraced a transformation. With PostgreSQL, Debezium, and Kafka, we engineered a tailored remedy that met our unique needs. Our clients’ intricate filtering requests, coupled with the complexity of changing database records, prompted us to pursue an agile, real-time resolution. This led us to merge PostgreSQL’s robustness with Debezium and Kafka’s agility, orchestrating smooth data flow. Through this journey, we dissect the core challenge: enabling detailed filtering across diverse fields. We explore how PostgreSQL’s strength, combined with Debezium and Kafka, shaped an outcome-driven solution. As we bridge rigid systems with dynamic business needs, our story stands as proof of the potential within data innovation.

Connecting PostgreSQL to Kafka with Debezium

Step 1: Pull and Start Debezium PostgreSQL Docker Image

  1. Open a Terminal or Command Prompt.
  2. Pull the Debezium PostgreSQL Docker image using the following command:
docker pull debezium/postgres

3. Start the Debezium PostgreSQL connector container using the following command:

docker run -it --rm --name debezium-postgres -p 8080:8080 -e BOOTSTRAP_SERVERS=kafka:9092 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e STATUS_STORAGE_TOPIC=my-connect-status -e CONFIG_STORAGE_REPLICATION_FACTOR=1 -e OFFSET_STORAGE_REPLICATION_FACTOR=1 -e STATUS_STORAGE_REPLICATION_FACTOR=1 debezium/postgres:latest

Step 2: Create and Configure Debezium Connector Registration

  1. Create a Debezium connector registration via an API or a command-line tool like curl. You can use the following example POST request to create the connector registration:
curl -X POST -H "Content-Type: application/json" --data '{
"name": "sample-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "sample_user",
"database.password": "sample_pass",
"database.dbname": "sample_db",
"database.server.name": "sample_servername",
"table.include.list": "sample_schema.sample_table",
"topic.prefix": "sample.topic.prefix",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"schema.include.list": "sample_schema",
"transforms.reroute_topic.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.reroute_topic.key.enforce.uniqueness": "false",
"transforms.reroute_topic.topic.regex": "sample_reroute_source_topic",
"transforms.reroute_topic.topic.replacement": "sample_reroute_target_topic",
"transforms": "unwrap,reroute_topic",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"snapshot.mode": "initial",
"decimal.format": "NUMERIC",
"json.output.decimal.format": "NUMERIC",
"decimal.handling.mode": "string"
}
}' http://localhost:8080/connectors

Meaning of Variables:

  • connector.class: Specifies the usage of Debezium's PostgreSQL Connector.
  • database.hostname: Specifies the address of the PostgreSQL database server.
  • database.port: Specifies the connection port of the PostgreSQL database server.
  • database.user: Specifies the username for the database.
  • database.password: Specifies the password of the user.
  • database.dbname: Specifies the name of the database to connect to.
  • database.server.name: Specifies the server name used by Debezium when sending data to Kafka.
  • table.include.list: Specifies the table to be tracked (e.g., sample_schema.sample_table).
  • topic.prefix: Specifies the prefix for the Kafka topics to be created.
  • plugin.name: Specifies the plugin name to be used for PostgreSQL by Debezium.
  • transforms: Specifies the transformations for the records.
  • transforms.unwrap.type: Specifies how records will be unwrapped.
  • snapshot.mode: Specifies the mode for taking an initial snapshot.

Example Data Model and Changes in the Table

Let’s consider an example data model for the sample_table:

CREATE TABLE sample_schema.sample_table (
id SERIAL PRIMARY KEY,
name VARCHAR(255),
age INT
);

Changes in the sample_table such as new row inserts or existing row updates are tracked by Debezium and propagated to Kafka topics. For instance, if a new row is inserted or an existing row is updated, Debezium captures these changes and sends them to the respective Kafka topic.

Transferring Data to Kafka Topics

Changes in the database are sent to Kafka topics. The Kafka topic named sample_reroute_target_topic is created by Debezium. This topic represents changes from the sample_table. Any updates, insertions, or deletions in the table are sent to this topic as JSON messages.

We have now successfully connected the PostgreSQL database to Kafka using Debezium and understood how changes in the sample_table are captured and transferred to Kafka topics. In the next step, we will cover the process of transferring data from Kafka to Elasticsearch.

Transferring Kafka Data to Elasticsearch

Step 1: Create Elasticsearch Sink Connector

  1. Open a Terminal or Command Prompt.
  2. Pull the required Docker image for the Elasticsearch Sink Connector using the following command:
docker pull confluentinc/cp-kafka-connect:latest

3. Start the Elasticsearch Sink Connector container using the following command:

docker run -it --rm --name es-sink-connector -e CONNECT_BOOTSTRAP_SERVERS=kafka:9092 -e CONNECT_REST_PORT=8083 -e CONNECT_GROUP_ID="connect-cluster" -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" -e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" -e CONNECT_PLUGIN_PATH="/usr/share/java,/etc/kafka-connect/jars" -p 8083:8083 confluentinc/cp-kafka-connect:latest

Step 2: Configure Elasticsearch Sink Connector

  1. Register the Elasticsearch Sink Connector via an API or a command-line tool like curl. You can create the connector registration using the following example POST request:
curl -X POST -H "Content-Type: application/json" --data '{
"name": "sample-es-connector",
"config": {
"type.name": "_doc",
"connector.client.config.override.policy": "All",
"consumer.override.group.id": "sample_consumer_group_id",
"consumer.override.client.id": "sample_client_id",
"name": "sample-es-connector",
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "sample_reroute_target_topic",
"connection.url": "elastic_url",
"max.retries": "2",
"retry.backoff.ms": "3000",
"key.ignore": "false",
"schema.ignore": "true",
"schemas.enable": "false",
"behavior.on.null.values": "DELETE",
"transforms": "extractKey",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field": "id",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}' http://localhost:8083/connectors

Explanation of Key Properties:

  • connector.client.config.override.policy: Specifies the policy for overriding other client configurations. When set to "All," all client configurations will be overridden.
  • consumer.override.group.id: Defines the Kafka consumer group. This specifies the consumer group that the Elasticsearch Sink Connector will join.
  • consumer.override.client.id: Defines the Kafka consumer client. This specifies the client ID that the Elasticsearch Sink Connector will use when communicating with Kafka.
  • connector.class: Specifies the Connector class to be used. Use io.confluent.connect.elasticsearch.ElasticsearchSinkConnector as the Elasticsearch data sink provider.
  • tasks.max: Specifies the maximum number of tasks that will run concurrently.
  • topics: Specifies the Kafka topics to be transferred to Elasticsearch. This topic name should include the data sent from Debezium to Kafka.
  • connection.url: Specifies the connection URL to the Elasticsearch cluster.
  • transforms.extractKey.field: Specifies the field in Elasticsearch that uniquely identifies a document. In this example, the id field is specified. This field is used to uniquely identify Elasticsearch documents.

Updating Data and Reflecting Changes in Elasticsearch

Let’s delve into a practical scenario to better understand how updates to the age field in the sample_table are seamlessly transmitted through the pipeline, from PostgreSQL to Elasticsearch.

  1. Scenario: Consider a record in the sample_table with the following information:
{
"id": 123,
"name": "Icardi",
"age": 28
}

2. Update: Now, let’s say that Icardi’s age is updated from 28 to 30. This change triggers an update in the sample_table.

3. Change Capture: Debezium springs into action, capturing the change with precision. It translates the update into a message, something like:

{
"before": {
"id": 123,
"name": "Icardi",
"age": 28
},
"after": {
"id": 123,
"name": "Icardi",
"age": 30
},
"source": {
// Debezium source details
}
}

4. Kafka Topic: This translated message is then dispatched to the Kafka topic we’ve configured, such as sample_reroute_target_topic.

5. Elasticsearch Sink: The Elasticsearch Sink Connector detects this message from the Kafka topic. It extracts the relevant data, such as the id and the updated age.

6. Elasticsearch Indexing: Finally, the Elasticsearch Sink Connector transforms this extracted information into a format that Elasticsearch understands. Elasticsearch indexes the updated document under the appropriate index and type. For example:

{
"_index": "sample_reroute_target_topic",
"_type": "_doc",
"_id": "123",
"_version": 2,
"_score": null,
"_source": {
"id": 123,
"name": "Icardi",
"age": 30
},
"fields": {
"id": [123]
},
"sort": [
123
]
}

In closing, this exploration has illuminated the intricate machinery that drives real-time data synchronization from PostgreSQL to Elasticsearch, powered by the synergy of Kafka and Debezium. Armed with this knowledge, you’re poised to navigate the fast-paced landscape of data management with confidence. By embracing these technologies, you’re equipped to ensure the seamless flow of insights, facilitating swift and informed decision-making. As the data frontier continues to evolve, your mastery of these tools will undoubtedly be a driving force behind your organization’s data-driven success.

--

--