Task queuing in Executors.newFixedThreadPool()
In Java, usually when we want to use a fixed size thread pool implementation, we would go for the Executors.newFixedThreadPool(numberOfThreads)
to get an implementation of the ExecutorService
. The implementation is an instance of the class ThreadPoolExecutor
which has an input queue to hold the submitted tasks if all of the threads are busy or else new threads are created to execute the submitted tasks as per the defined pool size.
So lets say we have a thread pool with a fixed size of 10 threads. Now lets simulate a situation where we are getting a lot of incoming tasks that needs to be executed in this pool and tasks are slow I/O operations:
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000000; i++) {
executorService.submit(() -> {
try {
Thread.sleep(5000); //to simulate slow I/O
System.out.println("After Runnable is executed");
} catch (InterruptedException ie) {
ie.printStackTrace();
}
});
System.out.println("Task Submitted");
}
executorService.shutdown();
We are submitting a million tasks to a thread pool of fixed size and only having 10 threads. What do you think will happen?
a. executorService.submit()
will start to block once all threads are busy and the internal queue is full.
b. executorService.submit()
will continue to accept tasks.
The answer is (b) the tasks will be submitted as they come and there will be no blocking during the submission of the tasks.
This is because if you look closely at the implementation of the Executors.newFixedThreadPool()
, the implementation is using an optionally bounded BlockingQueue
implementation namely the LinkedBlockingQueue
.
If you study the Java doc of the LinkedBlockingQueue
:
The optional capacity bound constructor argument serves as a way to prevent excessive queue expansion. The capacity, if unspecified, is equal to
Integer.MAX_VALUE
. Linked nodes are dynamically created upon each insertion unless this would bring the queue above capacity.
Now here is the implementation of the Executors.newFixedThreadPool()
:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Here an instance of the ThreadPoolExecutor
class is used to provide an implementation of the ExecutorService
, now notice the last argument of the ThreadPoolExecutor
constructor which takes a LinkedBlockingQueue
.The capacity argument of the LinkedBlockingQueue
is left out, meaning the BlockingQueue
is unbounded and it will hold as many tasks as the upper limit of integer permits.
But what if you want to rate limit the incoming tasks? That is if all the threads are busy and the internal queue of pre-defined capacity is also full we will block on the incoming task submission. This is not possible with the Executors.newFixedThreadPool()
for this we need to configure a custom ThreadPoolExecutor
and pass a bounded queue like a ArrayBlockingQueue
of a fixed capacity.
ExecutorService executorService = new ThreadPoolExecutor(10,
10,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10));
Here we are creating a new instance of ThreadPoolExecutor
with the same corePoolSize
and maxPoolSize
as the total number of threads which is 10, the keepAliveTime
is 0 milliseconds and lastly we are using a bounded queue of size 10. Now if you run the earlier snippet where we submit a million tasks the task should surely block right?
No, you would see that just after 20 tasks are submitted we get an exception:
Exception in thread main java.util.concurrent.RejectedExecutionException
This is because the queue is full and all the threads are busy no more tasks can be accepted, the extra tasks are rejected. So we must prevent that from happening. To fix this we can supply an instance of RejectedExecutionHandler
as another parameter to the ThreadPoolExecutor
constructor.
There are many default implementations of the RejectedExecutionHandler
provided in the Java API. Namely, ThreadPoolExecutor.AbortPolicy:
this policy will always throw RejectedExecutionException
when tasks are rejected, so we don't need to use this, next is the ThreadPoolExecutor.CallerRunsPolicy:
this handler runs the rejected tasks in the thread which is responsible for submitting the tasks to the thread pool, this looks like something we can use. There are two other choices which we are not interested in using: ThreadPoolExecutor.DiscardOldestPolicy
and the ThreadPoolExecutor.DiscardPolicy.
Now, just modify the code snippet and add a new instance of ThreadPoolExecutor.CallerRunsPolicy
as the last argument:
ExecutorService executorService = new ThreadPoolExecutor(10,
10,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.CallerRunsPolicy());
Now if you submit the tasks you would clearly see that, once 20 tasks are submitted the next set of tasks are blocked until the previous 10 tasks are completed! So using this method we can slow down the producer thread by running the rejected tasks in itself, if there is a slow consumer thread.