Apache Kafka Guide #11 Java API Simple Java Producer

Paul Ravvich
Apache Kafka At the Gates of Mastery
3 min readJan 6, 2024

--

Hi, this is Paul, and welcome to the 11 part of my Apache Kafka guide. Today we will discuss creating a Java Producer with the Java SDK of Apache Kafka.

Connect kafka-clients dependency

Maven:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>

Gradle:

implementation 'org.apache.kafka:kafka-clients:3.6.1'

We will now develop our inaugural producer. To do this, we initially establish the producer properties and then construct the producer itself. Following this, we will transmit data and subsequently execute flush and closure operations on the producer. We’ll delve into each of these steps in detail during this article. Currently, we’re just outlining the procedures to understand the structure of what we’re building.

For the `key.serializer` configuration, we specify its value here. By duplicating this line, we set the `value.serializer` as well. In Kafka, while utilizing a producer, certain data is provided initially in string format. This data is then converted into bytes by the `key.serializer` and `value.serializer` before it is transmitted to Apache Kafka. Consequently, the `StringSerializer` will be used here.

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");

properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", StringSerializer.class.getName());

Next, we will set up the producer. This involves establishing a Kafka producer with specified parameters. Specifically, the producer will be configured with key-value pairs, both of which are strings. This configuration aligns with the StringSerializer for both the key and the value.

KafkaProducer<String, String> producer = 
new KafkaProducer<String, String>(properties);

Next, we must proceed to create a Producer Record. What exactly is a Producer Record? This is a record intended for transmission to Kafka. Thus, it qualifies as a ProducerRecord. The key in this record is a string and its value is also a string.

ProducerRecord<String, String> record =
new ProducerRecord<String, String>("demo_topic", "Hello World");

Next, we must transmit the data. We utilize a producer, and the function for this action is named ‘send.’ This operation requires a ‘producerRecord’ as an argument in its constructor.

producer.send(record)

Then I’ll flush the producer, which means I’ll execute producer.flush(). This instructs the producer to transmit all data and wait until it’s finished. This is a synchronous operation, unlike the asynchronous data-sending process. After that, we can close the producer.

producer.flush();
producer.close();

I wanted to emphasize that the producer.flush() API is available for use. In this context, when data is sent by a producer, it’s done asynchronously. Without these two lines in my code, the program would have finished without giving the producer a chance to transmit data to Kafka. By implementing a flush, the data is actively sent to Kafka. Closing it afterward ensures everything is flushed out before the program concludes. In a real-world application, you would typically flush infrequently and close it before the program’s termination. Over time, as you execute .send(), data will continue to be transmitted, thanks to the ongoing operation of the producer.

Full producer code:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Producer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", StringSerializer.class.getName());

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> record =
new ProducerRecord<String, String>("demo_topic", "Hello World");

// Send data
producer.send(record);

// Tell producer to send all data and block until complete - synchronous
producer.flush();

// Close the producer
producer.close();
}
}

Don’t forget to create demo_topicbefore running the code.

Today we consider the simplest Apache Kafka Producer example.

In the next article, we’ll deep dive into how to use a producer's callback.

Paul Ravvich

Thank you for reading until the end. Before you go:

--

--

Paul Ravvich
Apache Kafka At the Gates of Mastery

Software Engineer with over 10 years of XP. Join me for tips on Programming, System Design, and productivity in tech! New articles every Tuesday and Thursday!