Implementing Change Data Capture with Debezium, PostgreSQL, and Google Cloud Pub/Sub Emulator using .NET for Data Consumption

David Bytyqi
Real-Time Data Pipeline
4 min readFeb 25, 2024

Overview

In this presentation, we explore the integration of PostgreSQL with Debezium for Change Data Capture (CDC), leveraging Google Cloud Pub/Sub emulator for message brokering, and implementing a C# application to process the streamed data. This setup provides a robust environment for real-time data processing and analytics.

Setting Up the Development Environment

Docker Compose Configuration

We start by defining a Docker Compose configuration to orchestrate our PostgreSQL database, the Pub/Sub emulator, and the Debezium connector. This setup ensures each component communicates efficiently, providing a seamless environment for CDC.

Environment Setup Steps

  • Install prerequisites: Docker, Docker Compose, and Google Cloud SDK.
  • Prepare configuration files: Organize your Docker Compose and Debezium configurations.
  • Launch services: Use docker-compose up to initialize PostgreSQL, Debezium, and the Pub/Sub emulator.

Debezium Configuration for PostgreSQL CDC

Debezium Configuration Essentials

The heart of CDC lies in Debezium’s configuration, where we define the source database, specify the CDC parameters, and set up the destination for the captured changes, in this case, the Google Cloud Pub/Sub emulator.

Understanding CDC with Debezium

Debezium monitors PostgreSQL for real-time changes, transforming them into structured events. These events can then be utilized for various downstream applications, enabling real-time data integration and analytics.

Google Cloud Pub/Sub Emulator Integration

Configuring the Pub/Sub Emulator

The Pub/Sub emulator simulates the Google Cloud Pub/Sub service, facilitating local development and testing. It’s crucial to configure this properly to ensure smooth communication between Debezium and the C# subscriber application.

Setting Up the Pub/Sub Environment

We’ll cover the steps to ensure the Pub/Sub emulator is correctly integrated within our environment, allowing Debezium to publish events and the C# application to subscribe to them.Building the C# Subscriber Application

Running the Application

Instructions for compiling, configuring, and launching the C# application will be provided, ensuring you can start consuming messages from the Pub/Sub topic immediately.

Dockerized Project with Complete Configuration

A Docker Compose configuration defining PostgreSQL, a Pub/Sub emulator, and Debezium services, each with their respective image, environment variables, ports, and volumes.

docker-compose.yml

services:
postgres:
image: postgres:latest
environment:
POSTGRES_PASSWORD: postgres
ports:
- "5432:5432"
command: ["postgres", "-c", "wal_level=logical"]
pubsub-emulator:
image: google/cloud-sdk:emulators
command: gcloud beta emulators pubsub start --project=test-project --host-port=0.0.0.0:8085
environment:
- PUBSUB_EMULATOR_HOST=pubsub-emulator:8085
- PUBSUB_PROJECT_ID=test-project
ports:
- "8085:8085"
volumes:
- ./gcloud-config/configuration:/root/.config/gcloud

debezium:
image: debezium/server:2.5
depends_on:
- postgres
environment:
- PUBSUB_EMULATOR_HOST=pubsub-emulator:8085
- PUBSUB_EMULATOR_NO_AUTH=true

volumes:
- ./conf:/debezium/conf
- ./data:/debezium/data

Debezium configured to capture PostgreSQL changes and publish to Pub/Sub, with table routing and JSON conversion.

application.properties

debezium.sink.type=pubsub
debezium.sink.pubsub.project.id=test-project
debezium.sink.pubsub.address=pubsub-emulator:8085
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=/tmp/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=postgres
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.database.server.name=tutorial
debezium.source.topic.prefix=streamio23
debezium.source.schema.include.list=inventory
debezium.snapshot.new.tables=parallel
debezium.source.plugin.name=pgoutput
debezium.transforms=Reroute
debezium.source.tombstones.on.delete=false
debezium.transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
debezium.transforms.Reroute.topic.regex=(.*)inventory(.*)
debezium.transforms.Reroute.topic.replacement=inventory.orders
debezium.source.value.converter=org.apache.kafka.connect.json.JsonConverter
debezium.source.value.converter.schemas.enable=true
debezium.source.database.history.file.filename=/tmp/FileDatabaseHistory.dat
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
pk.mode=record_key

In the Pub/Sub emulator, ensure to create the topic and subscription after launching the emulator container defined in the Docker Compose file

curl -s -X PUT 'http://localhost:8085/v1/projects/test-project/topics/inventory.orders'
curl -s -X PUT 'http://localhost:8085/v1/projects/test-project/subscriptions/inventory-orders-subscription' \
-H 'Content-Type: application/json' \
-d '{
"topic": "projects/test-project/topics/inventory.orders"
}'

This command sequence accesses a PostgreSQL container, creates an “Inventory” schema with an “Orders” table inside it, and inserts a test record into the table.

docker exec -it <<postgres-container>> bash
psql -U postgres

CREATE SCHEMA Inventory;
CREATE TABLE Inventory.Orders(Id serial primary key, name varchar(20));
INSERT INTO Inventory.Orders Values(1, 'test 1');

This code is an asynchronous C# program that subscribes to a Google Cloud Pub/Sub subscription, receives messages from it, acknowledges received messages, and handles duplicates.

using Google.Api.Gax.Grpc;
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Threading;
using System.Threading.Tasks;

class Program
{
static async Task Main(string[] args)
{
string projectId = "test-project";
string subscriptionId = "inventory-orders-subscription";
string endpoint = "http://localhost:8085";

await SubscribeAsync(projectId, subscriptionId, endpoint);
}

static async Task SubscribeAsync(string projectId, string subscriptionId, string endpoint)
{
SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

SubscriberServiceApiClient subscriberServiceApiClient = new SubscriberServiceApiClientBuilder
{
Endpoint = endpoint,
ChannelCredentials = ChannelCredentials.Insecure
}.Build();

var streamingPullRequest = new StreamingPullRequest
{
SubscriptionAsSubscriptionName = subscriptionName,
StreamAckDeadlineSeconds = 10
};

var cancellationTokenSource = new CancellationTokenSource();
var acknowledgedMessages = new HashSet<string>();

using (var streamingPull = subscriberServiceApiClient.StreamingPull())
{
await streamingPull.WriteAsync(streamingPullRequest);

await foreach (var response in streamingPull.GetResponseStream().WithCancellation(cancellationTokenSource.Token))
{
foreach (var message in response.ReceivedMessages)
{
if (!acknowledgedMessages.Contains(message.AckId))
{
Console.WriteLine($"Received message: {message.Message.Data.ToStringUtf8()}");

await subscriberServiceApiClient.AcknowledgeAsync(subscriptionName, new[] { message.AckId });

acknowledgedMessages.Add(message.AckId);
}
else
{
Console.WriteLine($"Skipping already acknowledged message: {message.Message.Data.ToStringUtf8()}");
}
}
}
}
}
}

Summary

A summary of key points covered in this presentation, emphasizing the seamless integration of PostgreSQL, Debezium, and Google Cloud Pub/Sub emulator for efficient CDC.

References

Debezium

Pub/Sub Emulator

PostgreSQL

Pub/Sub Emulator

gRPC on .NET

#pub/sub emulator #debezium #postgres #cdc #change-data-capture

--

--