CDC with Debezium, Kafka, Spring Boot 3 and Postgres

MEHMET ARİF EMRE ŞEN
Yazilim VIP
Published in
6 min readJan 14, 2024

This article, focusing on the technical details, explores a CDC implementation using Debezium, Apache Kafka, Spring Boot 3.1.2, and PostgreSQL. We’ll examine the architecture and configuration from the database to the Kafka consumer.

In this example, our application utilizes two tables: “person” and “audit”. Operations performed on the “person” table, such as data creation, trigger Debezium to publish corresponding events to Kafka. These events are then consumed and recorded in the “audit” table for tracking and analysis.

Source Code

Overview of The Technologies

  • Java Development Kit (JDK): 21
  • Gradle 8.5,
  • IntelliJ IDEA as an IDE
  • Docker
  • Spring Boot: 3.1.2

Our CDC architecture includes:

  • Debezium: Configured for capturing changes in PostgreSQL, linking directly to Kafka.
  • Apache Kafka: Set up for efficient data streaming; we’ll cover key configuration details.
  • Spring Boot: Used for building Kafka-interacting services
  • PostgreSQL: Configured to work with Debezium for data capture.

Please see more in my previous article for the configuration details
Article link: Streaming with Debezium, Kafka and Docker

Step1: Preparing Environment with Docker

Kick-start your journey by launching the necessary services using Docker. We could use a docker-compose file to create containers.

See: docker-compose.yaml

Our docker-compose file consists of the following container

  • database: Hosts the PostgreSQL database
  • zookeeper: Manages cluster coordination and state for Kafka.
  • Kafka: Runs the Apache Kafka service for data streaming
  • connect: Executes the Debezium service, CDC from PostgreSQL.
  • kafka-ui: GUI to monitor Kafka clusters.
  • debezium-ui: GUI for configuring and monitoring Debezium connectors.
  • setup-debezium-connector: A temporary container used to create the initial Debezium PostgreSQL connector.

Now, just run the following command to start containers.

docker-compose up -d

Please wait a minute and verify if all containers are operational and healthy. After this interval, you should observe that all containers are up and running, except for “setup-debezium-connector” container.

Initially, the topics you should have on Kafka can be found on the Kafka UI screenshot below.

Debezium — PostgreSQL connector configurations exist inside “docker-compose” file.

See: docker-compose.yaml

{
"name": "postgresql-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"connector.displayName": "PostgreSQL",
"topic.prefix": "pg-changes",
"database.user": "postgres",
"database.dbname": "db_local",
"table.exclude.list": "audit",
"database.hostname": "database",
"database.password": "postgres",
"name": "postgresql-connector",
"connector.id": "postgres",
"plugin.name": "pgoutput"
}
}

On Debezium UI, you should see postgresql-connector with the status RUNNING as in the screenshot below.

Step 2: Configuring Spring Boot Project

Step2.1: Create an empty project

If you are not familiar with Gradle, you could use the following links;

What is Gradle?
Building Java Applications Sample

Step 2.1: Configuring build.gradle.kts

Let’s start with configuring our Gradle build script. We will use Kotlin DSL for the Gradle build scripts. The full version exists on my GitHub repo.

See: build.gradle.kts

  1. We use the Spring Boot plugin for configuration and the Dependency Management plugin for version consistency.
id("org.springframework.boot") version "3.2.1"
id("io.spring.dependency-management") version "1.1.4"

2. Configure dependencyManagement to import Spring Cloud BOM, ensuring version compatibility of all Spring Cloud components.

dependencyManagement {
imports {
mavenBom("org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}")
}
}

3. Add necessary dependencies

// For creating web applications with RESTful support
implementation("org.springframework.boot:spring-boot-starter-web")

// Handles JSON operations in the application
implementation("org.springframework.boot:spring-boot-starter-json")

// Integrates Spring Data JPA and PostgreSQL driver for database interactions
implementation("org.springframework.boot:spring-boot-starter-data-jpa")
implementation("org.postgresql:postgresql")

// For integrating and testing with Apache Kafka using Spring Cloud Stream
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
testImplementation("org.springframework.cloud:spring-cloud-stream-test-binder")

// Other (Optional): Lombok for reducing boilerplate code
compileOnly("org.projectlombok:lombok:$lombokVersion")
annotationProcessor("org.projectlombok:lombok:$lombokVersion")
testCompileOnly("org.projectlombok:lombok:$lombokVersion")
testAnnotationProcessor("org.projectlombok:lombok:$lombokVersion")

Step 2.2: Configuring Spring Boot application

  1. The next step is configuring our “application.yaml” file. Here, we will configure kafka to consume Debezium Events. The full versions of the files exist on my GitHub repo.

See: application.yaml

spring:
...
cloud:
stream:
kafka:
binder:
brokers: localhost:9093
bindings:
debeziumEventConsumer-in-0:
destination: pg-changes.public.person
group: 1
content-type: application/json

NOTE THAT: In our Debezium configuration, we have set “pg-changes” as the Kafka topic prefix. This is indicated by the “topic.prefix” property in the connector configuration JSON mentioned above. The prefix “public” refers to our schema on the database, and “person” is the table's name.

2. Let’s create the required Entity and Repository classes.

See: entities package
See: repositories package

@Entity
public class Person {
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private Long id;
private String firstName;
private String lastName;
}

@Entity
public class Audit {
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private Long id;
private String tableName;
private String operation;
private LocalDateTime time;
private String owner;
private String value;
}

@Repository
public interface PersonRepository extends JpaRepository<Person, Long> {
}

@Repository
public interface AuditRepository extends JpaRepository<Audit, Long> {
}

3. Now, create the DTO to capture Debezium events.

See: DebeiumEventDto.java

@Data
public class DebeziumEventDto {

private Payload payload;

@Data
public static class Payload {
private Source source;
private Map<String, Object> before;
private Map<String, Object> after;

@JsonProperty("op")
@JsonDeserialize(using = OperationTypeDeserializer.class)
@JsonFormat
private OperationType operationType;

@Data
public static class Source {
@JsonProperty("ts_ms")
@JsonDeserialize(using = MillisToLocalDateTimeDeserializer.class)
private LocalDateTime transactionTime;

private String db;
private String schema;
private String table;

@JsonProperty("txId")
private long transactionId;
}
}

@Getter
public enum OperationType {
CREATE("c"),
UPDATE("u"),
DELETE("d"),
READ("r");

private final String code;

OperationType(String code) {
this.code = code;
}

public static OperationType fromCode(String code) {
for (OperationType type : OperationType.values()) {
if (type.getCode().equals(code)) {
return type;
}
}
throw new IllegalArgumentException("Unknown operation type code: " + code);
}
}
}

4. Creating a Kafka Consumer.

See: DebeziumEventConsumer.java

@Slf4j
@RequiredArgsConstructor
@Component
public class DebeziumEventConsumer implements Consumer<DebeziumEventDto> {

private final AuditRepository auditRepository;

@Override
public void accept(DebeziumEventDto eventDto) {
try {
final var payload = eventDto.getPayload();
final var source = payload.getSource();
final var audit = Audit.builder()
.tableName(source.getTable())
.operation(payload.getOperationType().name())
.time(source.getTransactionTime())
.value(Optional.ofNullable(payload.getAfter()).map(Object::toString).orElse(null))
.build();
final Audit savedAudit = auditRepository.save(audit);
log.info("{} event on {} table is audited as {}",
payload.getOperationType(),
source.getTable(),
savedAudit);
} catch (Exception e) {
log.error("Failed to processing consumed message {}", eventDto, e);
}
}
}

4. To use DebeziumEventDto, we need the following serializers.

See: MillisToLocalDateTimeDeserializer.java
See: OperationTypeDeserialize.java

public class MillisToLocalDateTimeDeserializer extends JsonDeserializer<LocalDateTime> {
@Override
public LocalDateTime deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
final long timestamp = p.getLongValue();
return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneOffset.UTC);
}
}

public class OperationTypeDeserializer extends JsonDeserializer<DebeziumEventDto.OperationType> {
@Override
public DebeziumEventDto.OperationType deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
return DebeziumEventDto.OperationType.fromCode(jsonParser.getValueAsString());
}
}

Step3: Testing Application

Let’s create a controller class to create a random user. The final version can be found on my GitHub Repo

See: PersonRestController.java

@RequiredArgsConstructor
@RequestMapping("api/person")
@RestController
public class PersonRestController {

private final PersonRepository personRepository;

@GetMapping("/random")
public ResponseEntity<Person> random() {
final String randomFirstName = "Maemre" + Math.random();
final String randomLastName = "Sen" + Math.random();
return ResponseEntity.ok(personRepository.save(new Person(null, randomFirstName, randomLastName)));
}
}

Now, let’s start our Spring Boot application. After you start, you should see the pg-changes.public.person” topic on your Kafka

Kafka UI: https://localhost:9000

Call https://localhost:8080/api/person/random API.

If everything works fine, you should see the log on our Kafka Consumer and the Audit data on the “audit” table.

Debezium Event Kafka Consumer Log
Audit Table Data

--

--