Task queuing in Executors.newFixedThreadPool()

Amardeep Bhowmick
4 min readApr 21, 2019

--

In Java, usually when we want to use a fixed size thread pool implementation, we would go for the Executors.newFixedThreadPool(numberOfThreads) to get an implementation of the ExecutorService. The implementation is an instance of the class ThreadPoolExecutor which has an input queue to hold the submitted tasks if all of the threads are busy or else new threads are created to execute the submitted tasks as per the defined pool size.

So lets say we have a thread pool with a fixed size of 10 threads. Now lets simulate a situation where we are getting a lot of incoming tasks that needs to be executed in this pool and tasks are slow I/O operations:

ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000000; i++) {
executorService.submit(() -> {
try {
Thread.sleep(5000); //to simulate slow I/O
System.out.println("After Runnable is executed");
} catch (InterruptedException ie) {
ie.printStackTrace();
}
});
System.out.println("Task Submitted");
}
executorService.shutdown();

We are submitting a million tasks to a thread pool of fixed size and only having 10 threads. What do you think will happen?

a. executorService.submit() will start to block once all threads are busy and the internal queue is full.

b. executorService.submit() will continue to accept tasks.

The answer is (b) the tasks will be submitted as they come and there will be no blocking during the submission of the tasks.

This is because if you look closely at the implementation of the Executors.newFixedThreadPool(), the implementation is using an optionally bounded BlockingQueue implementation namely the LinkedBlockingQueue.

If you study the Java doc of the LinkedBlockingQueue:

The optional capacity bound constructor argument serves as a way to prevent excessive queue expansion. The capacity, if unspecified, is equal to Integer.MAX_VALUE. Linked nodes are dynamically created upon each insertion unless this would bring the queue above capacity.

Now here is the implementation of the Executors.newFixedThreadPool():

public static ExecutorService newFixedThreadPool(int nThreads)       {
return new ThreadPoolExecutor(nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

Here an instance of the ThreadPoolExecutor class is used to provide an implementation of the ExecutorService , now notice the last argument of the ThreadPoolExecutor constructor which takes a LinkedBlockingQueue .The capacity argument of the LinkedBlockingQueue is left out, meaning the BlockingQueue is unbounded and it will hold as many tasks as the upper limit of integer permits.

But what if you want to rate limit the incoming tasks? That is if all the threads are busy and the internal queue of pre-defined capacity is also full we will block on the incoming task submission. This is not possible with the Executors.newFixedThreadPool() for this we need to configure a custom ThreadPoolExecutor and pass a bounded queue like a ArrayBlockingQueue of a fixed capacity.

ExecutorService executorService = new ThreadPoolExecutor(10,
10,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10));

Here we are creating a new instance of ThreadPoolExecutor with the same corePoolSize and maxPoolSize as the total number of threads which is 10, the keepAliveTime is 0 milliseconds and lastly we are using a bounded queue of size 10. Now if you run the earlier snippet where we submit a million tasks the task should surely block right?

No, you would see that just after 20 tasks are submitted we get an exception:

Exception in thread main java.util.concurrent.RejectedExecutionException

This is because the queue is full and all the threads are busy no more tasks can be accepted, the extra tasks are rejected. So we must prevent that from happening. To fix this we can supply an instance of RejectedExecutionHandler as another parameter to the ThreadPoolExecutor constructor.

There are many default implementations of the RejectedExecutionHandler provided in the Java API. Namely, ThreadPoolExecutor.AbortPolicy: this policy will always throw RejectedExecutionException when tasks are rejected, so we don't need to use this, next is the ThreadPoolExecutor.CallerRunsPolicy: this handler runs the rejected tasks in the thread which is responsible for submitting the tasks to the thread pool, this looks like something we can use. There are two other choices which we are not interested in using: ThreadPoolExecutor.DiscardOldestPolicy and the ThreadPoolExecutor.DiscardPolicy.

Now, just modify the code snippet and add a new instance of ThreadPoolExecutor.CallerRunsPolicy as the last argument:

ExecutorService executorService = new ThreadPoolExecutor(10,
10,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.CallerRunsPolicy());

Now if you submit the tasks you would clearly see that, once 20 tasks are submitted the next set of tasks are blocked until the previous 10 tasks are completed! So using this method we can slow down the producer thread by running the rejected tasks in itself, if there is a slow consumer thread.

--

--

Amardeep Bhowmick

Senior full-stack developer who loves writing functional JavaScript code and also loves playing with Java threads!