Integrating Apache Kafka in Laravel: Real-time Database Synchronization with Debezium Connector

Balwant Singh
Simform Engineering
5 min readJul 5, 2023

Empower your Laravel application with the latest and most accurate data by leveraging real-time database synchronization.

In today’s fast-paced digital world, real-time data synchronization is crucial for many web applications. In this tutorial, we will explore how to integrate Apache Kafka and the Debezium connector into a Laravel application to achieve efficient and reliable real-time data synchronization between two databases.

Prerequisites:

Laravel is installed on your local machine.

rdkafka php extension

Docker is installed to create a local development environment.

Basic knowledge of Laravel, Docker, and Apache Kafka concepts.

Setting up the development environment

We will begin by creating a local development environment using Docker. A Docker Compose file will be provided, which includes the necessary services such as ZooKeeper, Kafka, Schema Registry, MySQL, Connect Debezium, and Kafka Manager. With a few configurations and Docker container orchestration, our development environment will be ready for integration.

version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-kafka:6.2.0
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_ENABLE: 'false'
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1

schema-registry:
image: confluentinc/cp-schema-registry:6.2.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8081'
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092'

mysql:
image: debezium/example-mysql:1.2
container_name: mysql
ports:
- 3307:3306
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
volumes:
- ${PWD}/data/movies.sql:/docker-entrypoint-initdb.d/z99_dump.sql

connect-debezium:
image: debezium/connect:1.6
container_name: connect-debezium
depends_on:
- kafka
- mysql
- schema-registry
ports:
- 8083:8083
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: connect-debezium
CONFIG_STORAGE_TOPIC: docker-connect-debezium-configs
OFFSET_STORAGE_TOPIC: docker-connect-debezium-offsets
STATUS_STORAGE_TOPIC: docker-connect-debezium-status
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
volumes:
- ${PWD}/scripts:/scripts

kafka-manager:
image: hlebalbau/kafka-manager:latest
container_name: kafka-manager
depends_on:
- kafka
ports:
- "9000:9000"
environment:
- ZK_HOSTS=zookeeper:2181
- APPLICATION_SECRET=letmein

Also, the necessary scripts and SQL files are mentioned in this tutorial. You can access it from the GitHub repository I provided at the end of this blog. Inside the repository, you will find the scripts needed to run the Docker Compose setup, as well as the SQL file used for initializing the MySQL database. Feel free to explore the repository, clone it to your local environment, and adapt the code and configurations to suit your specific needs.

Installing Kafka dependencies in Laravel

Next, we will install the Kafka library in our Laravel application using Composer. This library provides the necessary tools and classes to interact with the Kafka broker. We will also configure Laravel to establish a connection with the Kafka broker, specifying the required configuration details.

composer require mateusjunges/laravel-kafka
php artisan vendor:publish --tag=laravel-kafka-config

Configuring the Debezium connector

The Debezium connector is a powerful tool that captures database changes and streams them to Kafka topics. We will configure the Debezium connector to connect to our MySQL databases, specify the tables we want to capture changes from, and map them to Kafka topics. This step will ensure that the connector captures the desired data for synchronization.

Credit: Source

At this point, we will consume the Kafka topics in our Laravel application using the Kafka consumer library. We will set up a consumer that retrieves the data stream from Kafka and processes it in real-time. With each event received from the Debezium connector, we will apply the necessary modifications to keep our synchronized database up to date.

First, we need to create a Laravel command for consuming Kafka events. Run the following command:

php artisan make:command KafkaConsumer

Next, register the command into the kernel.php file:

protected $commands = [
Commands\KafkaConsumer::class,
];

Now, let’s see what the KafkaConsumer command should look like:

<?php

namespace App\Console\Commands;

use App\Events\MovieDataReceived;
use Illuminate\Console\Command;
use Junges\Kafka\Contracts\KafkaConsumerMessage;
use Junges\Kafka\Facades\Kafka;

class KafkaConsumer extends Command
{
protected $signature = 'kafka:consume';
protected $description = 'Consume messages from Kafka topics';

public function handle()
{
$consumer = Kafka::createConsumer(['movies'])
->withHandler(function (KafkaConsumerMessage $message) {
event(new MovieDataReceived(json_encode($message->getBody())));
$this->info('Received message: ' . json_encode($message->getBody()));
})->build();

$consumer->consume();
}
}

For Kafka events, we dispatch a Laravel event that handles data modification between two databases. Create the MovieDataReceived event using the following command:

php artisan make:event MovieDataRecived
<?php

namespace App\Events;

use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class MovieDataReceived
{
use Dispatchable, SerializesModels;

public $data;

/**
* Create a new event instance.
*
* @return void
*/
public function __construct($data)
{
$this->data = $data;
}
}

Next, create a listener for the event to handle data modification and insertion into our local database using the following command:

php artisan make:listener SyncMovieData --event=MovieDataReceived

Register the event listener in the EventServiceProvider:

protected $listen = [
MovieDataReceived::class => [
SyncMovieData::class,
],
];

Here’s the modified SyncMovieData listener class:

<?php

namespace App\Listeners;

use App\Events\MovieDataReceived;
use App\Models\Movie;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Support\Facades\Log;

class SyncMovieData implements ShouldQueue
{
/**
* Handle the event.
*/
public function handle(MovieDataReceived $event): void
{
// Extract the movie data from the event
$data = json_decode($event->data);

Movie::updateOrCreate(['movie_id' => $data->movie_id], [
'movie_id' => $data->movie_id,
'title' => $data->title,
'release_year' => $data->release_year,
'country' => $data->country,
'actors' => $data->actors,
'genres' => $data->genres
]);

Log::info('Movie data synchronized: ' . $data->movie_id);
}
}

To test this configuration and application, run the Docker environment using the following command:

docker compose up

After the Docker environment is up, send a CURL request to the Kafka connector to create a connector in Debezium for getting events from the MySQL database:

curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-debezium-orders-00/config \
-d '{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "42",
"database.server.name": "asgard",
"table.whitelist": "demo.movies",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.demo" ,
"decimal.handling.mode": "double",
"include.schema.changes": "true",
"transforms": "unwrap,dropTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex":"asgard.demo.(.*)",
"transforms.dropTopicPrefix.replacement":"$1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}'

Also, don’t forget to run the kafka:consume command to consume Kafka events in the Laravel application:

php artisan kafka:consume

Now visit http://localhost:9000 to access Kafka manager UI.

Conclusion

Integrating Apache Kafka and the Debezium connector into a Laravel application opens up a world of possibilities for real-time data synchronization. In this tutorial, we explored the step-by-step process of setting up the development environment, configuring Kafka and the Debezium connector, consuming Kafka topics in Laravel, and monitoring and handling errors. By harnessing the power of Kafka and Debezium, you can achieve seamless real-time data synchronization in your Laravel projects, empowering your application with the latest and most accurate data.

GitHub Repository

If you’re eager to dive deeper into this topic and start implementing real-time data synchronization in your Laravel projects, I invite you to check out the accompanying GitHub repository:

https://github.com/Balwant-Singh-Rathore/Laravel-Kafka-Event-Driven

This repository contains the complete code and configuration examples discussed in this tutorial.

For more updates on the latest tools and technologies, follow the Simform Engineering blog.

Follow Us: Twitter | LinkedIn

--

--

Balwant Singh
Simform Engineering

Software Engineer | Laravel | PHP | SQL | MySQL | Angular Js | jQuery | Database