CodeX
Published in

CodeX

Dealing with long-running jobs using Apache Kafka

Five approaches for performing long-running jobs with examples in Java (Spring)

Photo by Kevin Ku on Unsplash

Problem

While(true){  event = poll();  //since this happens sequentially, the next iteration & poll call    only happen after the method is done  runLongRunningJob(event);  commitOffset()}
Figure 1: Initial state of the system
Figure 2: Consumer A takes too long processing the event and leaves the group
Figure 3: Consumer B takes over, and the cycle continues

Solutions

  • Optimising the code
  • Increase the timeout
  • Fire-and-forget threading
  • Pause consumer with threading and callbacks
  • Split up the job into microprocesses

Optimising the code

Increase the timeout

spring.kafka.consumer.properties.max.poll.interval.ms= 300000
  • Code is not affected, its pure configuration
  • Excels when a job takes more than 5 minutes, but is still predictably fast
  • If the threshold is not known up front it’s just (educated) guessing
  • Has an impact on rebalancing of consumers, especially when the configuration is set to a high value
  • When the max runtime of the job is predictable
  • When the runtime of the job time is not much longer than the default

Fire-and-forget threading

@EnableAsync
@Asyncpublic void run(String event) {   //some long running job}
  • Minimal code changes
  • High concurrency
  • Works regardless of how long job takes
  • Resources get sparse when lots of long running jobs are triggered
  • Error handling like retries become more complex as this logic must be added to the processing logic
  • I/O intensive jobs that don’t take a lot of resources
  • Jobs can run independent of one another
  • Unpredictable/long runtimes
  • There is an orchestrator to keep track of success/error/retries (all flow wise logic)

Pause consumer with threading and callbacks

while(true){event = poll();consumer.pause(topicPartition);thread.runLongRunningJob(event)      .callbackSuccess(resume(topicPartition))      .callbackError(handleError());commitOffset();}
@KafkaHandlerpublic void handleEvent(@Payload String event) {  log.info(“Handling the event with body {} the pause container way”, event);  //pauses the container  containerSupportMethods.pauseConsume(containerId);  executor.submitListenable(() -> longRunningJob.run(event))  .addCallback(result -> {    containerSupportMethods.resumeConsumer(containerId);    log.info(“Success callback”);  },  ex -> {    //perform retry mechanism like a dead letter queue here    containerSupportMethods.resumeConsumer(containerId);    log.warn(“Error callback”);  }  );}
private final KafkaListenerEndpointRegistry registry;public void pauseConsume(String containerId) {  getContainer(containerId)
.ifPresent(MessageListenerContainer::pause);
}public void resumeConsumer(String containerId) { getContainer(containerId)
.ifPresent(MessageListenerContainer::resume);
}private Optional<MessageListenerContainer> getContainer(String containerId) { return Optional
.ofNullable(registry.getListenerContainer(containerId));
}
  • Stable, even for resource (CPU/Memory) intensive jobs
  • The callback can be used for error handling, retries etc., making it suitable for choreography patterns as well
  • Retain the natural queuing behavior of Kafka partitions
  • Requires more code changes than the previous approaches
  • Limited/no concurrency, which can result in longer overall processing time
  • You have unpredictable or very long runtimes
  • CPU/Memory intensive jobs
  • Order of job execution matter

Split the job into microprocesses

While(true){  event = poll();  resultA = performA(event); //max 4 minutes  result B = performB(resultA); //max 4 minutes  finalResult = performC(resultB); //max 4 minutes}
While(true){  event = poll();  resultA = performA(event); //max 4 minutes  produce(resultA);}
While(true){  resultA = poll();  resultB = performB(resultA); //max 4 minutes  produce(resultB)}
While(true){  resultB = poll();  finalResult = performC(resultB); //max 4 minutes}
  • Individual stages can be deployed and scaled separately
  • Maintainability of individual stages is likely to go up (due to their limited scope)
  • Overall complexity of your jobs likely to increase due to separation of code
  • Process needs to be coordinated, which adds (even more) complexity or components to the system
  • If one (or more) of the parts can’t be scoped so granular that it fits the default 5 minutes, a different solution needs to be found for that specific case
  • The job is splitable in smaller units of work
  • The job is critical enough for the system to justify the added complexity

Some tips to take along

Partitioning strategy

Limit number of events consumed per poll

spring.kafka.consumer.max-poll-records=1
Default behavior, polling multiple events at once
Behavior when polling one event at the time

Use manual commits

spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE

Wrapping it up

  • Not-async: the problem illustrated, also contains the configuration of the first solution
  • Spring-async: Fire-and-forget threading
  • Pause-container: Pause consumer with threading and callbacks
  • Pause-container-with-acknowledge: Pause consumer with threading and callbacks combined with the tips of manual commits and limit number of events polled
  • Microprocesses: Split the job into microprocesses

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Xavyr Rademaker

Software engineer at Deloitte with an interest in subjects like, but not limited to, event-driven microservices, Kubernetes, AWS, and the list goes on.