Integrating Apache Kafka in Laravel: Real-time Database Synchronization with Debezium Connector
Empower your Laravel application with the latest and most accurate data by leveraging real-time database synchronization.
In today’s fast-paced digital world, real-time data synchronization is crucial for many web applications. In this tutorial, we will explore how to integrate Apache Kafka and the Debezium connector into a Laravel application to achieve efficient and reliable real-time data synchronization between two databases.
Prerequisites:
Laravel is installed on your local machine.
rdkafka
php extensionDocker is installed to create a local development environment.
Basic knowledge of Laravel, Docker, and Apache Kafka concepts.
Setting up the development environment
We will begin by creating a local development environment using Docker. A Docker Compose file will be provided, which includes the necessary services such as ZooKeeper, Kafka, Schema Registry, MySQL, Connect Debezium, and Kafka Manager. With a few configurations and Docker container orchestration, our development environment will be ready for integration.
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:6.2.0
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_ENABLE: 'false'
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:6.2.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8081'
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092'
mysql:
image: debezium/example-mysql:1.2
container_name: mysql
ports:
- 3307:3306
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
volumes:
- ${PWD}/data/movies.sql:/docker-entrypoint-initdb.d/z99_dump.sql
connect-debezium:
image: debezium/connect:1.6
container_name: connect-debezium
depends_on:
- kafka
- mysql
- schema-registry
ports:
- 8083:8083
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: connect-debezium
CONFIG_STORAGE_TOPIC: docker-connect-debezium-configs
OFFSET_STORAGE_TOPIC: docker-connect-debezium-offsets
STATUS_STORAGE_TOPIC: docker-connect-debezium-status
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
volumes:
- ${PWD}/scripts:/scripts
kafka-manager:
image: hlebalbau/kafka-manager:latest
container_name: kafka-manager
depends_on:
- kafka
ports:
- "9000:9000"
environment:
- ZK_HOSTS=zookeeper:2181
- APPLICATION_SECRET=letmein
Also, the necessary scripts and SQL files are mentioned in this tutorial. You can access it from the GitHub repository I provided at the end of this blog. Inside the repository, you will find the scripts needed to run the Docker Compose setup, as well as the SQL file used for initializing the MySQL database. Feel free to explore the repository, clone it to your local environment, and adapt the code and configurations to suit your specific needs.
Installing Kafka dependencies in Laravel
Next, we will install the Kafka library in our Laravel application using Composer. This library provides the necessary tools and classes to interact with the Kafka broker. We will also configure Laravel to establish a connection with the Kafka broker, specifying the required configuration details.
composer require mateusjunges/laravel-kafka
php artisan vendor:publish --tag=laravel-kafka-config
Configuring the Debezium connector
The Debezium connector is a powerful tool that captures database changes and streams them to Kafka topics. We will configure the Debezium connector to connect to our MySQL databases, specify the tables we want to capture changes from, and map them to Kafka topics. This step will ensure that the connector captures the desired data for synchronization.
At this point, we will consume the Kafka topics in our Laravel application using the Kafka consumer library. We will set up a consumer that retrieves the data stream from Kafka and processes it in real-time. With each event received from the Debezium connector, we will apply the necessary modifications to keep our synchronized database up to date.
First, we need to create a Laravel command for consuming Kafka events. Run the following command:
php artisan make:command KafkaConsumer
Next, register the command into the kernel.php file:
protected $commands = [
Commands\KafkaConsumer::class,
];
Now, let’s see what the KafkaConsumer command should look like:
<?php
namespace App\Console\Commands;
use App\Events\MovieDataReceived;
use Illuminate\Console\Command;
use Junges\Kafka\Contracts\KafkaConsumerMessage;
use Junges\Kafka\Facades\Kafka;
class KafkaConsumer extends Command
{
protected $signature = 'kafka:consume';
protected $description = 'Consume messages from Kafka topics';
public function handle()
{
$consumer = Kafka::createConsumer(['movies'])
->withHandler(function (KafkaConsumerMessage $message) {
event(new MovieDataReceived(json_encode($message->getBody())));
$this->info('Received message: ' . json_encode($message->getBody()));
})->build();
$consumer->consume();
}
}
For Kafka events, we dispatch a Laravel event that handles data modification between two databases. Create the MovieDataReceived
event using the following command:
php artisan make:event MovieDataRecived
<?php
namespace App\Events;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
class MovieDataReceived
{
use Dispatchable, SerializesModels;
public $data;
/**
* Create a new event instance.
*
* @return void
*/
public function __construct($data)
{
$this->data = $data;
}
}
Next, create a listener for the event to handle data modification and insertion into our local database using the following command:
php artisan make:listener SyncMovieData --event=MovieDataReceived
Register the event listener in the EventServiceProvider
:
protected $listen = [
MovieDataReceived::class => [
SyncMovieData::class,
],
];
Here’s the modified SyncMovieData
listener class:
<?php
namespace App\Listeners;
use App\Events\MovieDataReceived;
use App\Models\Movie;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Support\Facades\Log;
class SyncMovieData implements ShouldQueue
{
/**
* Handle the event.
*/
public function handle(MovieDataReceived $event): void
{
// Extract the movie data from the event
$data = json_decode($event->data);
Movie::updateOrCreate(['movie_id' => $data->movie_id], [
'movie_id' => $data->movie_id,
'title' => $data->title,
'release_year' => $data->release_year,
'country' => $data->country,
'actors' => $data->actors,
'genres' => $data->genres
]);
Log::info('Movie data synchronized: ' . $data->movie_id);
}
}
To test this configuration and application, run the Docker environment using the following command:
docker compose up
After the Docker environment is up, send a CURL request to the Kafka connector to create a connector in Debezium for getting events from the MySQL database:
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-debezium-orders-00/config \
-d '{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "42",
"database.server.name": "asgard",
"table.whitelist": "demo.movies",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.demo" ,
"decimal.handling.mode": "double",
"include.schema.changes": "true",
"transforms": "unwrap,dropTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex":"asgard.demo.(.*)",
"transforms.dropTopicPrefix.replacement":"$1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}'
Also, don’t forget to run the kafka:consume command to consume Kafka events in the Laravel application:
php artisan kafka:consume
Now visit http://localhost:9000 to access Kafka manager UI.
Conclusion
Integrating Apache Kafka and the Debezium connector into a Laravel application opens up a world of possibilities for real-time data synchronization. In this tutorial, we explored the step-by-step process of setting up the development environment, configuring Kafka and the Debezium connector, consuming Kafka topics in Laravel, and monitoring and handling errors. By harnessing the power of Kafka and Debezium, you can achieve seamless real-time data synchronization in your Laravel projects, empowering your application with the latest and most accurate data.
GitHub Repository
If you’re eager to dive deeper into this topic and start implementing real-time data synchronization in your Laravel projects, I invite you to check out the accompanying GitHub repository:
https://github.com/Balwant-Singh-Rathore/Laravel-Kafka-Event-Driven
This repository contains the complete code and configuration examples discussed in this tutorial.
For more updates on the latest tools and technologies, follow the Simform Engineering blog.