Power of Reactor + Kafka

👉 Modern Microservices and its reactive nature

Ground Breakers: Java, Kafka, Reactor-Kafka.

📮Imagine your post box. Neither you need to stand over there nor the postman to shake the hands for every post you are receiving.

⏳How do we operate this business then? Leave the post inside a box. We just converted the business use case here to a comfortable zone to both the parties as, one responsible for delivery whereas other responsible to consume/receive while the post is not missing. (not worrying about the misplacing into neighbors postbox). If any post box had multiple organized stacks in it, then technically it is called a topic(PostBox) having partitions(Stacks).

Making rest calls from one service to another is not a bad idea to fulfill a request when needed.

But, follow the below use-cases:

👉 When multiple services require the same data to process

👉 When the downstream services are down

👉 When multiple nodes are running of the same application in a distributed environment.

👉 When a request requires an eventual fulfillment.

👉 When we are in a need for Auto Exponential Backoff retry etc..

When we need to support the above such requirements, designing such applications should adopt the event driven mechanism than just making rest calls to each other in all directions.

That's the time services should react when there is a message. Technically, services will react when it has a message for it. This brings the isolation to operate independently.

☔️ Understanding the reactor, requires three major components

  • MONO
  • FLUX
  • SCHEDULERS

🏃→ Mono:

When the need is to emit one element and later to do multiple transformations on that element, we should go with Mono.

Quick Example:

The caller of this method knows that you are returning a single element.

🏃 → Flux:

Similar to Mono, when we want to keep emitting multiple elements from a stream to run the transformations on each we should go with Flux. It is infinite and we can stop when needed.

Quick Example:

Observe the log with 2 second delay

🏃 → Schedulers: Its a power boosting to Reactors.

These provides the below factories to create and schedule jobs on fly for you to emit and run the transformations.

  • Elastic — Cached Thread Pools to run the jobs.
  • Immediate — Executes a task on the caller thread only.
  • Parallel — for Parallel work
  • Single — for Parallel work
Observe the Thread Name: [main] when Immediate Schedule is in use.
Observe the Thread Names and timing. Parallel Threads are in use.
Single Threaded on even the parallel flux is in use.
Elastic.

The primary difference between elastic and parallel requires the understanding of ExecutorService from JDK:

🌿A sample ExecutorService looks like below:

ExecutorService e = Executors.newFixedThreadPool(4); ← This will creates a single pool having 4 threads in it.

Elastic Scheduler creates ExecutorServices dynamically and caches the thread pools, reusing the pools once the threads have been finished the job execution.

Where as, Parallel Scheduler hosts a fixed pool of single-threaded ExecutorServices.

⛑ Time for HandsOn. Enough Knowledge to start coding….

  1. Download: Confluent (OpenSource): https://www.confluent.io/download/

2. UnArchive and go to bin directory on your terminal.

3. Run > ./confluent start and then produce and consume messages as below:

🐎 Lets do the same from the Java Application:

Basic Java libraries you will need:

compile group: 'io.projectreactor', name: 'reactor-core', version: '3.2.1.RELEASE'
compile group: 'io.projectreactor.kafka', name: 'reactor-kafka', version: '1.1.0.RELEASE'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.0.0'
compile group: 'io.projectreactor.addons', name: 'reactor-extra', version: '3.2.0.RELEASE'

🥀 Reactor Kafka Explained: Understanding the below piece helps you build most of the use-cases in the field.

👽scheduler = Schedulers.newElastic(“FLUX_DEFER”, 10, true); 

️️ ☝️Dynamic creation of ExecutorService will be in use when needed.

👽 ReceiverOptions<Integer, String> options = receiverOptions.subscription(Collections.singleton(topic))                .addAssignListener(partitions -> log.info("Partitions Assigned {}", printPartitions(partitions)))                .addRevokeListener(partitions -> log.info("Partitions Revoked {}", printPartitions(partitions)))                //.assignment(Collections.singleton(new TopicPartition(topic, 9)))  // <-- ** EASY TEST ONLY **                .commitInterval(Duration.ZERO)                .commitBatchSize(0);

☝️ We are subscribed on a single topic → Collections.singleton(topic)

☝️You can be having “n” number of partitions(Racks inside a postbox) on a topic as discussed in the beginning of the this article. Printing them on the console will help to understand from which partitions the client is consuming the events. → partitions -> log.info(“Partitions Assigned {}”, printPartitions(partitions)). If you are not receiving the events means some another client is consuming those messages from some another machine depending on the Partition where the producer is produced to.

Note that Broker will distribute the partitions to multiple clients running on multiple machines having the same Client Group Id.

INFO  FluxDeferConsumer - Partitions Assigned [ --email.v1_1, --email.v1_2, --email.v1_0, --email.v1_3, --email.v1_4,  ]

Above example log can explains the partitions assigned to the client.

👽assignment(Collections.singleton(new TopicPartition(topic, 9))

☝️ This was commented and useful when you want to test a single partition messages. This case is useful to start the debugging when we are in the initial phase of development and testing. If not, since, we are using the elastic Scheduler, multiple threads will get created for multiple partition messages and the processing will be happening in parallel, up to the maximum number of partitions and also depending on the machine processor resources(CPU/GPU). It is always efficient to be system or machine level efficient before scaling the applications in a distributed fashion blindly.

👽.commitInterval(Duration.ZERO).commitBatchSize(0)

☝️The above configurations plays a major game and telling to the clients, don’t wait for batch commits. :-) Meaning, Reactor Kafka Clients are intelligent to do the transaction with the broker saying do a transaction in bulk than doing individually for every event we are consuming and commit immediately. Our internal sense should instruct as, We will not be consuming the events from the topic’s partition until we acknowledge back as we consumed the message successfully and ready to go for the next one, is called with a simple term commit.

👽Flux.defer(receiver::receive)

☝️The events that we are receiving are now going to emit as part of the flux. The reason to use FluxDefer is to address a bug in reactor with the schedulers to avoid no scheduler can’t be published business.

👽.groupBy(m -> m.receiverOffset().topicPartition())

☝️This explains to group the events coming from each partition and so ensures the order of events to process with in that partition.

👽flatMap(partitionFlux ->

☝️FlatMap process the events asynchronously and in non-blocking fashion and not in order.

👽partitionFlux.publishOn(scheduler)

☝️Creates a new thread to process the event.

👽.concatMap(m -> consumeMessage(m)                                       .thenEmpty(m.receiverOffset().commit())))

☝️ConcatMap → Process the events in the order and synchronously. Since, we are using thenEmpty of a Mono the consumeMessage is Expected to Deliver Either Mono.Empty() or Mono.Error() or Mono.Just() all together Mono<VOID>. Look at the example code on github.

👽.retryBackoff(numRetries, Duration.ofSeconds(2), Duration.ofHours(2))

☝️ This will be interesting. On consumeMessage(m) returns Mono.Error, that will be propagated to the parent where we are retrying configure number of times for a specific period of time on an exponential backoff.

👽.doOnError(err -> { handleError(err); }).retry()

☝️ This is an another Gem of Reactor Programming example. After the retries finished propagate the error to handle by ourself say commit the event as we tried for enough times and enough duration we would like to go for the next event. Finally, instead of disposing the flux on error, we want to restart it, starting the consumption again from the next event.

👽.subscribe();

And finally subscribe. With out this the flux will not start emitting the events that it is receiving from the partitions.

This explains the power of reactor. We wrote a lot of functionality in just few lines like retrying certain amount of time for certain times on non blocking threads asynchronously and in parallel based on the system resources and restating the consumer automatically in grouped and ordered fashion. Imagine, writing by ourself that requires the amount of effort as playing with threads is not always easy with deadlock and race conditions. So, be REACTIVE when needed.

💍 Below are the sample configurations for the Kafka Broker.

reactor:
kafka:
configurations:
bootstrap.servers: localhost:9092
#retries: 3
max.poll.interval.ms: 300000 # 5 minutes
request.timeout.ms: 360000 # Join Group MaxTime
session.timeout.ms: 20000
max.poll.records: 10 #
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

An Example of Perfect Retry:

🚦Pitfalls:

  • Well, when a request needs to be fulfilled synchronously, whether reactive or not, having a crashed downstream system will never serve the user request.
  • Also, when the user waiting is not necessary, there is no point of eventual request processing.
  • It brings an additional dependency of queuing mechanism. Though the Kafka is highly reliable — constant and heavy infrastructure maintenance is required.

Many thanks to Linda Zheng and Parvesh Kumar for reviewing and keeping the content simple.

Follow the Partition Rebalance Problem to Solve at here.

Find the publication here: https://medium.com/thinkspecial

Gopi Krishna Kancharla- Founder of http://allibilli.com

--

--