Dmytro Nasyrov
Feb 2 · 6 min read
Kafka using Java.

Give us a message if you’re interested in Blockchain and FinTech software development or just say Hi at Pharos Production Inc.


Kafka is one of the most popular tools in high-load systems. A log-based real-time streaming system with absolutely amazing throughput is a perfect choice for micro-service-based services with thousands of thousand daily requests.

If you haven’t installed Kafka locally for development purposes, you can read how to do this in our previous article.

How to install Apache Kafka on MacOS

In the previous article, we have discussed how to install Kafka and ZooKeeper on MacOS. In this article, we will explore how to use Kafka using one of the most popular programming languages — Java. Current Kafka version works only on Java 8, but for our own project we can use whatever Java we like, but 8 or higher.


Project Setup.

Let’s set up a new project. We will use IntelliJ Idea as IDE, but you can choose whatever you like. We creating a new Maven project. Don’t select any archetype and choose your favorite version of Java — 11.0.1 in our case.

New Maven Project.

Next enter Group ID — your domain, Artifact ID — project name and required version. In our case it’s com.pharosproduction, tutorial-kafka-java and 1.0.

Project Identifiers.

Next, let’s enter the project name and other overall configuration settings.

Project Settings

Don’t forget to enable a really helpful feature of Idea IDE — Auto-Import.

Enable Auto-Import

In the Java directory, we will create 2classes:

  • Producer — data producer;
  • Consumer — data consumer.

Open Producer class and type psvm — great shortcut in Idea which means Public Static Void Main and press tab or return key. Now we have an entry point of the application.

psvm
Newly created main function.

Now let’s change Maven configuration. Open pom.xml and add build configuration which says to use Java 8 or higher as a strict requirement.

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>

Next, we should add Kafka and Logger dependencies.

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>

If you have enabled auto-import, Maven will sync with repositories on its own. Otherwise, on the right side of the IDE you see a Maven tab. Click it and run re-import by clicking on the circle arrows button.

Re-import dependencies.

The final configuration file should look like this

Maven config file.

Producer.

We’re ready to write our first producer. In Producer class let’s define global variables. There are only two:

  • producer itself;
  • logger.
Private Producer’s variables.

Here we say that producer should receive a string key and a string value.

Let’s define a private method for creating producer properties and setting up the producer itself. We need to define server IP, key and value serializers. That’s all for now. Bootstrap Server — is an IP of our Kafka broker.

Producer properties.

Let’s add a constructor. We bypass bootstrap server to properties method here and instantiate a new KafkaProducer object.

Producer constructor.

One of two public methods we defining is a put method. It will, well, put a value with a corresponding key to Kafka log.

In the put method, we define a new ProducerRecord which take Kafka's topic, key and value. We send key with a value to Kafka asynchronously in the method send. Also, send has a callback with recorded metadata and exception in case of a failure, which we happily log with a logger instance.

Do you see a small method get at the end of put method? Don’t use it in production. This method makes sending a synchronous operation. We use it just for the sake of clarity in a demo application.

Put method.

The final method in Producer is a close method, to close a connection with Kafka.

Close method.

To try our producer let’s add a simple use case to its main method.

Producer’s main method

We can run the main method just by clicking on a green play button and clicking Run it.

Run main method in Producer.

In the run-log, you should see something like this. Here you can find producer initialization, two values sent to Kafka and their corresponding partitions, offsets, and timestamps. And the final statement about the closing of a producer’s connection.

Run log

Consumer.

In consumer, we’re starting with global variables too. We define a logger, bootstrap server, group identifier, and topic.

Consumer global variables.

Consumer construct just assigns objects to this variables.

Consumer constructor.

And now the most interesting part of the tutorial. We define a new inner class nested from Runnable. Guess why? Because we want to run consumer in a separate thread.

Consumer Runnable.

In a similar fashion to a producer, we define consumer’s properties. Here we need a bootstrap server IP, group identifier, deserializer for keys and values and we define an auto-offset reset to earliest value in a topic.

Consumer properties.

A constructor of the inner class should look like this. We bypass here all our old friends — server IP, group ID, and topic, together with a CountDownLatch. Also, we subscribing the consumer to the topic.

CoundDownLatch is a kind of barrier, you can find more here:

CountDownLatch Docs

A constructor of Consumer Runnable.

The Run function looks like the code below. Here we define a polling mechanism to fetch data from a topic every 100 milliseconds. Then we read newly fetched records or raise an exception to shut down the connection.

Run function

The Shutdown function is simple

Shutdown.

To run all these things we define a run method in the outer class. Here we create a latch, create a new runnable, create and start a new thread. Also, we define a shutdown hook to intercept shutdown event of the consumer also in a separate thread.

Run public method.

Similarly to Producer, let’s define a main method for the Consumer.

Consumer’s main method.

When you will run both mains, two run-logs should show you a communication via Kafka broker.

Consumer’s log.
Producer’s log.

The source code is available in our Github repo:

Tutorial Kafka Using Java Repo


To know more about Kafka and how to use Kafka with Java, click on links below

Apache Kafka. MacOS installation guide.

Kafka Using Java. Part 2.


Thanks for reading!

Pharos Production

Software Development Company. Fintech and Blockchain. Enterprise Solutions.

Dmytro Nasyrov

Written by

CTO and Founder@ Pharos Production Inc. — Blockchain and Fintech Software Development Company.

Pharos Production

Software Development Company. Fintech and Blockchain. Enterprise Solutions.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade