Kafka is one of the most popular tools in high-load systems. A log-based real-time streaming system with absolutely amazing throughput is a perfect choice for micro-service-based services with thousands of thousand daily requests.
If you haven’t installed Kafka locally for development purposes, you can read how to do this in our previous article.
In the previous article, we have discussed how to install Kafka and ZooKeeper on MacOS. In this article, we will explore how to use Kafka using one of the most popular programming languages — Java. Current Kafka version works only on Java 8, but for our own project we can use whatever Java we like, but 8 or higher.
Let’s set up a new project. We will use IntelliJ Idea as IDE, but you can choose whatever you like. We creating a new Maven project. Don’t select any archetype and choose your favorite version of Java — 11.0.1 in our case.
Next enter Group ID — your domain, Artifact ID — project name and required version. In our case it’s com.pharosproduction, tutorial-kafka-java and 1.0.
Next, let’s enter the project name and other overall configuration settings.
Don’t forget to enable a really helpful feature of Idea IDE — Auto-Import.
In the Java directory, we will create 2classes:
- Producer — data producer;
- Consumer — data consumer.
Open Producer class and type psvm — great shortcut in Idea which means Public Static Void Main and press tab or return key. Now we have an entry point of the application.
Now let’s change Maven configuration. Open pom.xml and add build configuration which says to use Java 8 or higher as a strict requirement.
Next, we should add Kafka and Logger dependencies.
If you have enabled auto-import, Maven will sync with repositories on its own. Otherwise, on the right side of the IDE you see a Maven tab. Click it and run re-import by clicking on the circle arrows button.
The final configuration file should look like this
We’re ready to write our first producer. In Producer class let’s define global variables. There are only two:
- producer itself;
Here we say that producer should receive a string key and a string value.
Let’s define a private method for creating producer properties and setting up the producer itself. We need to define server IP, key and value serializers. That’s all for now. Bootstrap Server — is an IP of our Kafka broker.
Let’s add a constructor. We bypass bootstrap server to properties method here and instantiate a new KafkaProducer object.
One of two public methods we defining is a put method. It will, well, put a value with a corresponding key to Kafka log.
In the put method, we define a new ProducerRecord which take Kafka's topic, key and value. We send key with a value to Kafka asynchronously in the method send. Also, send has a callback with recorded metadata and exception in case of a failure, which we happily log with a logger instance.
Do you see a small method get at the end of put method? Don’t use it in production. This method makes sending a synchronous operation. We use it just for the sake of clarity in a demo application.
The final method in Producer is a close method, to close a connection with Kafka.
To try our producer let’s add a simple use case to its main method.
We can run the main method just by clicking on a green play button and clicking Run it.
In the run-log, you should see something like this. Here you can find producer initialization, two values sent to Kafka and their corresponding partitions, offsets, and timestamps. And the final statement about the closing of a producer’s connection.
In consumer, we’re starting with global variables too. We define a logger, bootstrap server, group identifier, and topic.
Consumer construct just assigns objects to this variables.
And now the most interesting part of the tutorial. We define a new inner class nested from Runnable. Guess why? Because we want to run consumer in a separate thread.
In a similar fashion to a producer, we define consumer’s properties. Here we need a bootstrap server IP, group identifier, deserializer for keys and values and we define an auto-offset reset to earliest value in a topic.
A constructor of the inner class should look like this. We bypass here all our old friends — server IP, group ID, and topic, together with a CountDownLatch. Also, we subscribing the consumer to the topic.
CoundDownLatch is a kind of barrier, you can find more here:
The Run function looks like the code below. Here we define a polling mechanism to fetch data from a topic every 100 milliseconds. Then we read newly fetched records or raise an exception to shut down the connection.
The Shutdown function is simple
To run all these things we define a run method in the outer class. Here we create a latch, create a new runnable, create and start a new thread. Also, we define a shutdown hook to intercept shutdown event of the consumer also in a separate thread.
Similarly to Producer, let’s define a main method for the Consumer.
When you will run both mains, two run-logs should show you a communication via Kafka broker.
The source code is available in our Github repo:
Thanks for reading!