Dealing with long-running jobs using Apache Kafka

Five approaches for performing long-running jobs with examples in Java (Spring)

Xavyr Rademaker
CodeX
16 min readJul 13, 2021

--

Photo by Kevin Ku on Unsplash

As engineers, we always aim to use the right tool for the right job, striving for excellence. However, sometimes we face a situation where we have delivery demands and we just must make it work with the tools available. While that is often a somewhat frustrating experience, it’s also a learning experience that leads to deeper and better understanding.

This is exactly what happened on one of my recent projects. We used an event-driven microservice architecture mostly using Java with Spring for our microservices and Apache Kafka as our event broker. At some point we needed to add long running jobs to our landscape. Given the time pressure we were under, we decided not to add new components to our landscape at that point in time, and so our long running jobs turned into new microservices that consume events from Kafka and start their processing.

Even though all seemed to go well at first, we started to run into issues when we increased the number of replicas for our microservices. These issues had everything to do with the default configurations of Kafka consumers, which are built for many fast jobs instead of few long running ones. After doing some research, we managed to get it to work without changing the tools in our landscape (and without any delays to the overall planning). In this blog I aim to show you the issues we encountered, and the various solutions we implemented, based on the different types of jobs.

With this blog, I do not try to encourage you to use Apache Kafka as message broker for long-running jobs. Its strengths lie in having lots of events in a fast-streaming type of system with small, fast jobs/processes that get triggered often. If you are in a situation that requires long-running jobs, I would advise to look for an alternative solution which is optimized for parallelized long running jobs. Instead, this blog aims to show you how to deal with long running jobs using Apache Kafka with 4 different implementations, in case you are in one of those rare scenario’s where there’s simply no time/budget to get another tool in your landscape.

Before we get started, some final remarks. This blog expects at least some level of familiarity with the basics of Apache Kafka and concepts of peer-to-peer choreography and orchestration. As I will be referring to some of those concepts throughout the problem & solutions. I have also created a repository with some very basic examples for all solutions I will discuss in this blog, which can be found here.

Problem

Kafka consumers are, in Spring, by default configured to poll at least once every 5 minutes. If a consumer fails to poll within 5 minutes of its last poll, it will be marked as a dead consumer and leave the group. When using a Spring Kafka consumer, it will poll continuously on the consumer thread after booting up, so you don’t have to do anything for this to work. Once an event is polled the system will start processing this event on the consumer thread until its finished. This means that by default, the consumer won’t be able to poll again until its done processing (since the consumer thread is busy processing).

Below you can find a small piece of pseudo code to explain the consumer processing a bit better, please note that with long running jobs I refer to anything that takes longer to process than the default of 5 minutes.

Now that we understand the default behavior, let’s illustrate the scenario where things go wrong. In the following example we have a consumer group with 2 consumers in it, consumer A and consumer B. The consumers listen to topic T1 which has 2 partitions TP1 & TP2, meaning that each consumer has 1 partition assigned: consumer A has TP1, and consumer B has TP2. Furthermore, there is 1 new event on partition TP1 called ETP1. This initial situation is illustrated in Figure 1.

Figure 1: Initial state of the system

Consumer A just consumed event ETP1 and started processing. Processing this event takes approximately an hour, so it just starts with it. After 5 minutes of processing (and thus not polling) consumer A gets marked as dead and leaves the group. These steps are illustrated in Figure 2.

Figure 2: Consumer A takes too long processing the event and leaves the group

This triggers a rebalance and the partitions get redistributed, assigning TP1 and TP2 to consumer B. As consumer A didn’t commit its offset yet, it will consume event ETP1 as well and start processing. This too will take longer than 5 minutes, so we can probably guess what the future hold for this consumer. Figure 3 visualizes this process.

Figure 3: Consumer B takes over, and the cycle continues

Besides the fact that every consumer that consumes ETP1 will have the misfortune of having to leave the group after 5 minutes, all of them are still busy processing the event even after leaving the group. This means that event ETP1 will be processed twice in our example. Just imagine having 10 or 20 replicas. Sooner or later all of them will be stuck processing the exact same event. This of course is a huge waste of time, resources and, maybe even worse, this can leave the system in an inconsistent state if the events aren’t designed to be idempotent.

Solutions

Now that we have an idea of the problem that occurred, let’s look at several solutions to tackle it. This section will cover the following five different approaches to solving the polling issue:

  • Optimising the code
  • Increase the timeout
  • Fire-and-forget threading
  • Pause consumer with threading and callbacks
  • Split up the job into microprocesses

Keep in mind that even though the five solutions are described independently of one another, they aren’t necessarily mutually exclusive. Combining microprocesses with increased timeouts or fire-and-forget threading for example works just fine.

The examples given in this blog use either pseudo code or Java with Spring, but of course the solutions work in other languages as well. For example, we have also implemented some of them in Python.

Optimising the code

The first, and most obvious, solution is to optimize the code you have written. Sometimes using the right datatypes can save a ton of time. For example, if your code involves looping through a list looking for an object with id 123 scales linearly (O(n)) with the size of the list. Loading your data into a HashMap instead of a list, and using the ID as key is likely to increase performance since HashMap lookups generally scale constantly (O(1)) (on average).

Besides using the right datatypes, also check whether there are redundant loops and/or calculations. If there aren't any redundant pieces of code, try some sanity checking of the all integrations as well. Some examples of questions to ask yourself are: Are we loading data multiple times? Did we optimise our writes to the database (think bulk inserts instead of normal inserts/upserts)? Are our queries optimised and do we have indexes on tables where we need them (or maybe too much indexes)? Can we combine certain computations in the same loop, preventing us from having to iterate over the same list again? All of these questions can lead to optimisations to be made in our system.

Of course, this is a simplified scenario, and sometimes the code is as optimised as it can be, but the job just takes a long time to process. In such cases the other solutions discussed in this blog might help. However, this doesn’t take away that it’s always worth looking at whether the code can be optimised.

Increase the timeout

When I described the problem earlier, I mentioned that the default configuration of max poll interval is 5 minutes. So, a second solution is to increase the value of this configuration. The specific Kafka configuration to set for this is “max.poll.records”, which in Spring can be set as follows (note that the 300000 is the default of 5 min):

As simple as this solution might sound, there are some drawbacks to it. The biggest drawback is the fact that if things go wrong with your main processing thread (consumer thread), it will only be detected after the configured poll interval. If we set its value to 1 hour, it means that it will take 1 hour for the broker to realize something is wrong and trigger a rebalance. In a lot of cases 1 hour of doing nothing before recovering is not acceptable.

Secondly, how much time is enough? If you know that the process will always take at most 6 minutes than upping the configuration might be feasible. However, in a lot of cases finding the right value can become (educated) guesswork at best. Setting it too low means the same problem will occur after a longer period, while setting it too high means that it will take longer to trigger a rebalance when things really are going wrong.

To summarize the pros and cons for this approach:

Pros:

  • Code is not affected, its pure configuration
  • Excels when a job takes more than 5 minutes, but is still predictably fast

Cons:

  • If the threshold is not known up front it’s just (educated) guessing
  • Has an impact on rebalancing of consumers, especially when the configuration is set to a high value

Finally, some scenario’s where you can use this approach to solve this problem:

  • When the max runtime of the job is predictable
  • When the runtime of the job time is not much longer than the default

Fire-and-forget threading

The third approach on the list is fire-and-forget threading. Looking closely at this issue, we see that it lies in the fact that the long running jobs block the polling loop on the consumer thread, preventing it from polling again until the job is done. Now what happens if we offload the work to separate worker threads? In that case the polling loop just continues while the job runs.

Starting a job in a fire-and-forget manner using Spring is quite straight forward. Start by adding the following annotation to either your configuration class (if you have one) or your main class:

This enables Spring’s asynchronous method execution capability. The second, and last, addition to our code needed is to add the following annotation to the public method that starts the long running job:

Simply calling this method from your consumer will run it as a background job while the polling loop of the consumer continues without being interrupted. Spring will handle the thread management for you.

As you can see, this approach requires little changes to the code base. The solution also works regardless of the processing time. The sheer ease of use, minimal code changes and the fact that Spring handles the concurrency make this approach quite attractive.

However, there are some downsides to the approach as well. When your job is very resource intensive (high CPU/Memory usage), running the job concurrently for multiple events means that you risk running out of resources, which means either slow performance due to max CPU usage or even worse, running out of memory.

Besides being resource intensive, dealing with failures also becomes more complex with this approach. Due to the fire-and-forget nature of this approach, the worker thread needs to contain its resilience logic as well. This can be anything from calling itself on failure (as retry mechanism) to producing an event to a dead letter topic (or to the original topic, putting it back on the queue) for later reprocessing.

Furthermore, since the consumer won’t wait for the event to be processed until consuming the next event, we lose the queuing behavior of Kafka. This means that all events, even the ones on the same partition, must be able to be processed out of order or an internal queue needs to be built in the microservice (to be sure to follow the queues of the Kafka partition) which increases complexity.

What I found working quite well with this approach is the orchestration strategy. Since the core of that strategy is to have an orchestrator that handles flow-management, it can deal with the error flows of the fire-and-forget style long running jobs. All the microservice needs to assure is to always produce its outcome back to the orchestrator, whether its successful or not.

Summarizing this approach:

Pros:

  • Minimal code changes
  • High concurrency
  • Works regardless of how long job takes

Cons:

  • Resources get sparse when lots of long running jobs are triggered
  • Error handling like retries become more complex as this logic must be added to the processing logic

When to use this approach:

  • I/O intensive jobs that don’t take a lot of resources
  • Jobs can run independent of one another
  • Unpredictable/long runtimes
  • There is an orchestrator to keep track of success/error/retries (all flow wise logic)

Pause consumer with threading and callbacks

The fourth solution to the problem is to pause the consumer. This approach is also described in the documentation here (under detecting consumer failures).

When pausing a consumer, you typically pause specific topic partitions. The consumer will still poll the broker however, it will simply not accept any new events from the specified topic partitions until its resumed again. The poll simply works as a health check. To ensure that the polling really continues, we still offload the work to a worker thread (since otherwise the polling loop will be blocked as explained in solution 2). The difference is that we don’t use fire-and-forget style processing, we actively add a callback to the job which resumes the processing again.

Code wise the pseudo code would look something like this:

The offset will still be committed automatically (alternatives to this will be discussed later), but we have added a callback to the long running job. In this callback we can also handle error scenario’s (in Spring we can add both success and error callbacks) where we can add our retry or Dead Letter Topic logic, making it suitable for peer-to-peer choreography as well without having to modify our worker logic.

Implementing this approach in Java using Spring looks as follows:

Where pause and resume functionality is implemented like:

One thing to note is that the low-level Kafka library allows you to call pause on the consumer directly, specifying exactly which partitions to pause. Spring has added a layer of abstraction on top of the low-level consumer called the MessageListenerContainer (Spring container that runs the consumer) from where you call the pause method. This container does not allow you to specify which topic partitions should be paused, instead it pauses all partitions assigned to the consumer for all topics the consumer listens to. On the flipside, this container does provide thread safety allowing us to resume the container on the worker thread (in the callback), where the low-level consumer object is not thread safe (and doesn’t resume when called from the callback thread).

When resources become scarce in the fire-and-forget approach, this solution will work quite well for you. This is because you limit the amount of concurrency, making it much more stable. Also, we regain the freedom to handle errors gracefully without adding too much complexity and the queuing behavior from Kafka partitions are retained (due to the limited/no concurrency). However, we do have to add more logic to our consumer to implement this, and we lose the speed benefits concurrency can give us.

Summarizing this approach:

Pros:

  • Stable, even for resource (CPU/Memory) intensive jobs
  • The callback can be used for error handling, retries etc., making it suitable for choreography patterns as well
  • Retain the natural queuing behavior of Kafka partitions

Cons:

  • Requires more code changes than the previous approaches
  • Limited/no concurrency, which can result in longer overall processing time

When to use this approach:

  • You have unpredictable or very long runtimes
  • CPU/Memory intensive jobs
  • Order of job execution matter

Split the job into microprocesses

The last solution which can be feasible depending on the nature of the job is to split it up into smaller processes. This solution works well in situations where the job is quite compute-heavy and can be parallelized. Do note that this is more of a design solution which is technology agnostic.

For example, if our job performs three steps with each new event called A, B and C, and each of them take at most 4 minutes to process as in the pseudo code below:

This example job can be split up into three microprocesses, each having their own consumer and running within the default time limitations. After refactoring the (pseudo) code base would be:

Consumer A:

Consumer B:

Consumer C:

There are multiple benefits to dividing long running jobs into microprocesses, such as it being easier to maintain & deploy. Each microprocess can be its own microservice so they can be deployed while other stages run. Also, each microprocess can be giving appropriate resources, tailored to the needs of that specific stage and of course it removes the issue with long running jobs and Kafka.

However, it does come at a price: added overall complexity in your system. Understanding that a single long running process means that you must go over all the individual microprocesses to find what they all do and how they tie together. Also, since the processes are split up into smaller parts, coordination over these processes need to happen as well either using peer-to-peer choreography or orchestration. Not to mention that not all jobs can be stripped down to microprocesses so granular that each stage fits the max poll interval duration.

Given this information, I’d say that this approach has the following pros and cons:

Pros:

  • Individual stages can be deployed and scaled separately
  • Maintainability of individual stages is likely to go up (due to their limited scope)

Cons:

  • Overall complexity of your jobs likely to increase due to separation of code
  • Process needs to be coordinated, which adds (even more) complexity or components to the system
  • If one (or more) of the parts can’t be scoped so granular that it fits the default 5 minutes, a different solution needs to be found for that specific case

Some scenarios were dividing the job into microprocesses would be feasible:

  • The job is splitable in smaller units of work
  • The job is critical enough for the system to justify the added complexity

Some tips to take along

While implementing the five solutions above, there were some implementation details that I found handy which are detailed out in this section.

Partitioning strategy

If the order of execution is irrelevant, make sure the partitioning strategy used by your system distributes the long running jobs evenly among partitions. In our case we used round robin partitioning to ensured that work got distributed approximately evenly over all replicas in de consumer group handling the long running jobs.

Good to know, round robin partitioning was the default partitioning strategy for Kafka 2.3 and below (when producing without a key). However, this is replaced with the sticky partitioner in version 2.4 and above. Even though the sticky partitioner provides a lot of benefits in terms of performance in scenario’s where you have lots of small jobs, it is not helpful when having a few long running jobs (where even distribution leads to more parallelism).

Besides using the round robin partitioner, it is of course always possible to write a custom partitioning strategy suitable to your use case.

Limit number of events consumed per poll

In resource intensive jobs where we used the pause consumer approach, we found it useful to limit the max polled records per poll (we limited it to 1). This can be done by setting the following spring configuration:

This forces the consumer to process 1 event at the time, limiting the amount of time the consumer is paused to an absolute minimum. Also, in scenario’s where you want to scale up replica’s while some consumers are still processing data, this approach gives the highest chance of parallelism after scaling. This scenario is illustrated in the two GIFs below:

Default behavior, polling multiple events at once
Behavior when polling one event at the time

Use manual commits

This too holds especially for the resource intensive jobs, where the consumer is paused and resumed using the callback. Instead of having spring auto-commit upon successful deserialization of the event, commit the events yourself in the callback of the event processing logic. This allows you to build proper error handling scenario’s as well by using a Dead Letter Topic, retries (adding the event to the back of the partition or really retrying the logic) or “nack”-ing the event.

Spring provides a nice wrapper class to handle acknowledgements of events, called “Acknowledgment”, which you can get as a parameter to your Kafka handler in your listener (consumer). To get access to this object, set the following configuration in your application.properties:

After successful processing, the event can be acknowledged by calling acknowledgement.acknowledge(). Similarly, upon failed processing of an event it can be nacked using acknowledgement.nack() or, as mentioned before, acknowledge it in combination with some retry logic.

Wrapping it up

That finalizes this blog on dealing with long running jobs using Kafka consumers. In case you want to have some working examples for the different approaches, have a look at this repository: https://github.com/Xavyr-R/kafka-long-running-jobs. The modules in the repo map to the solution as follows:

  • Not-async: the problem illustrated, also contains the configuration of the first solution
  • Spring-async: Fire-and-forget threading
  • Pause-container: Pause consumer with threading and callbacks
  • Pause-container-with-acknowledge: Pause consumer with threading and callbacks combined with the tips of manual commits and limit number of events polled
  • Microprocesses: Split the job into microprocesses

Before leaving you to it, I’d like to repeat that, if your situation allows for it, you will likely be better off using an alternative for running/orchestrating long running jobs instead of Apache Kafka. In my next blog I will introduce workflow engines, which could serve as one of those alternatives. So, stay tuned for future blogs!

--

--

Xavyr Rademaker
CodeX

Software engineer at Deloitte with an interest in subjects like, but not limited to, event-driven microservices, Kubernetes, AWS, and the list goes on.