ExecutorService Internal Working in Java

Anmol Sehgal
CodeX
Published in
14 min readJul 25, 2021

--

Executor Service Framework would arguably be the most favourite and easy-to-use concept to execute the work in parallel. It helps with parallelising the execution without the developer being required to take the charge of complexities that come along with multi-threading. Developers can spawn up an Executor Service and submit tasks to it, for it to run in a parallel manner.

I have always been fond of understanding how it actually handles all such tasks parallelly and understanding all the nuances which come with it, like CorePoolSize, Queues, Handlers, etc. We will learn more about these concepts in detail, and understand the brilliance put in these sorts of functionalities provided out-of-box which we can leverage.

What is Executor Service?

While programming, you might have come across a scenario where you have some similar tasks to execute, and want to execute them parallel to speed up things. E.g. If you have a Service, which takes user Input and calls backend services to process them.

You can opt for Serial processing, wherewith each input, you make a Backend call, and do so for every Input you receive. But as you can imagine, it would be a slow system.

To enable parallelism, you can leverage the Threading Model where you have say 10 Threads, and each thread can receive an Input, and make the Backend call to process them. With this, your system can support 10 times the load it does earlier. As you would have 10 Threads, waiting to receive Input and processing them, at any given time you would be able to process 10 inputs.
But again, this comes with a price.

You need to worry about the inter-thread communications, Spawning up new Threads at high load and terminating the idle threads at low peaks, ensuring locking, Race conditions, deadlocks between the threads, maintaining states of various threads, etc. So to ease out these things, so that a developer can just focus on the core things he is interested in, eg. here just making backend calls, there is ExecutorService Framework in Java JDK.

ThreadPoolExecutor is just the implementation of the ExecutorService interface.

Basic Working of Thread Pool Executors:

ThreadPool Executors need some defined parameters, which it uses to control the number of threads it will create to handle the tasks submitted to it. We will see in detail about all these params, but let's understand how ThreadPoolExecutors actually work, from 1000ft height:

So the basic fundamental is the tasks submitted frequency may be out of sync with the task process rate, meaning if 100 tasks are submitted per second and only 10 tasks are able to process in 1 second, by all threads, then we still don't want to lose those 90 tasks.

So there is a well-known Data structure to handle such cases: Queues.
So if no Thread is free, we can push all tasks to the Queue, and tasks will eventually be read by the Threads for executions once they are freed from their current work.

Then the Thread from the ThreadPool can come, read the oldest task which was submitted(which lies at the head of the Queue), and can start processing it.

So using the queue, the threads can process the tasks at their own pace.
Depending upon certain parameters given, the thread count can increase/decrease, and we will see how that works below.

Jargons in Thread Pool Executors:

1. CorePoolSize

CorePoolSize is the number of threads that must be run in parallel as the new task arrives. If say CorePoolSize = 10 and only 5 tasks have been submitted so far, then we would have 5 Threads running/active yet. When the 6th Task will be submitted, instead of putting the new task into the Queue, the 6th Thread will be created and this task will be submitted to it immediately. This is true even if the previously created 5 Threads were idle and had done their work and waiting for new tasks. So if fewer than the CorePoolSize threads are running, a new thread will always be created to handle the new Task.

2. MaximumPoolSize

This property governs the maximum number of threads that can run. SO when we have CorePoolSize number of threads created, and new task arrives, they would be put to the Queue. Now once the queue also gets full, then new Threads will be created if MaximumPoolSize > CorePoolSize. So say CorePoolSize was 10 and MaximumPoolSize=15, and we have a Queue of size 10. If we have 20 requests that came in at a high frequency, such that all 10 Threads were created and started running their tasks, and extra 10 tasks were put to the Queue. Now when the 21st Task will arrive, and since MaximumPoolSize > CorePoolSize, a new Thread will be created which can start reading from the Queue to make a vacancy for the 21st Task in the Queue.

So the Rules when a new task arrives is:

a. If fewer than CorePoolSize threads are running, the Executor always prefers creating a new thread rather than queuing.

b. If CorePoolSize or more threads are running, the Executor always prefers queuing a request rather than creating a new thread.

c. If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

4. KeepAliveTime

This prevents the Threads to be lying Idle and wasting resources, and this can be used to govern when to kill the idle threads. So if CorePoolSize=10 and with the above example say we had 11 threads running. Once the tasks are all handled and say these threads are waiting for new tasks to arrive, they would be lying idle and wasting resources. Then if the excess Threads(more than CorePoolSize, i.e. 1 here) are lying idle for more than KeepAliveTime, then those will be terminated. So this property is useful to reduce resources consumption when the pool is not being actively used.

5. workQueue

The queue is used to hold the submitted tasks. There can be 3 types of Queue that can be used:

  • Direct handoffs(like Synchronous Queue): It won't hold any task and whenever a request comes to add to the Queue, it hands-off to the Thread. In case no thread is available, then the en-queue operation will fail. Usually, MaximumPoolSize is set to a very high number if such a queue is to be used so that we can handle all the incoming requests with new Threads, instead of holding them to the Queue. It finds its usage where more real-time operations are to be performed without storing them and executing them later.
  • Unbounded Queue(like LinkedBlockingQueue): Such Queues do not have any size set, thus called unbounded. Since such queues will never get full, the en-queue will never fail, and more threads than the CorePoolSize will never be created, and hence in such queues, MaximumPoolSize property is not honored. (Recall that if Addition to Queue fails, then new Threads will be created if MaximumPoolSize > CorePoolSize).
  • Bounded Queue( like ArrayBlockingQueue): Such queues have fixed size, so when requests > CorePoolSize, a new Request will be added to the Queue. Once this queue is also full, new threads(if MaximumPoolSize > CorePoolSize) will be created.

6. ThreadFactory

Since new Threads are created with new requests coming in, we need a Factory that can create a new Thread. By default, all the threads created are of the same Priority, of the same Thread-group, non-daemon status, etc. Of course, such things can be changed by providing your own ThreadFactory.

7. RejectedExecutionHandler

When the Executor has been shut down, and if both Queue is full and MaximumPoolSize threads are created, new requests can not be handled, so in these scenarios, the incoming request will be rejected. You can provide your own RejectionHandler to handle such rejections. By default there are 4 such Handlers provided:

  • AbortPolicy: An RejectedExecutionException is thrown if this handler is used.
  • CallerRunsPolicy: With this handler, the Client itself runs the task, once it's rejected by the Executor.
  • DiscardPolicy: With this handler, the rejected Request is simply dropped/ignored.
  • DiscardOldestPolicy: With this handler, the task oldest in the Queue is dropped, and then this Task is retried.

Note that ThreadPool has 5 states:

  1. Running: ThreadPool can receive new tasks.
  2. Shutdown. Do not accept new tasks, but can process the tasks which are already added in the Queue.
  3. Stop: Do not accept new tasks, and do not process tasks present in the queue, and also interrupt the currently executing tasks.
  4. Tidying: All tasks have been terminated, WorkerCount is zero. The threads will then be in the Tidying state and will run the Terminated() hook method soon.
  5. Terminated: Perform Terminated() method. It's a hook method provided so that we can do some handling at the termination of ThreadPool.

Creating the ThreadPoolExecutor:

By Specifying the corePoolSize, MaximumPoolSize, keepAliveTime, etc, we can modify how we want the ThreadPoolExecutor to look.

But if we do not want to worry so much about all these, then we can also leverage some inbuilt methods present:

1. Fixed thread pool executor — Creates a thread pool that reuses a fixed number of threads to execute any number of tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available.

2. Cached thread pool executor — Creates a thread pool that creates new threads as needed, and reuses previously constructed threads when they are available. Since this creates threads as and when needed, this must NOT be used if tasks are long-running. It can bring down the system if the number of threads goes beyond what the system can handle. If you can think how is this working- it's simple, it has maximumPoolSize set to be Infinite.

3. Scheduled thread pool executor — Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically.
To do this, a special type of Queue(DelayedWorkQueue) is used where when the request to poll comes, it only returns the value after a certain delay.

4. Single thread pool executor — Creates single thread to execute all tasks. Since all tasks will be executed by a single thread, all tasks execute sequentially. As you might have guessed, it has corePoolSize and maximumPoolSize = 1.

There is another very ingenious thing used in ThreadPoolExecutors, which I liked a lot. Please give it a read if the Binary Manipulation interests you. It's the usage of ctl.

ThreadPoolExecutor needs to know 2 states every time:

  • State it is in. There are 5 states: RUNNING, SHUTDOWN, STOP, TIDYING, TERMINATED.
  • Number of Threads running.

We could have used an Object to have these 2 states, but since both of these are Integers, and to speed up things, a single Integer value is used to represent both these states. Let's see how it is done.

An integer has 32 bits in java. So we have to leverage these 32 bits to represent the above 2 parameters. Since we have 5 States only, and 5 can be represented using 3 bits: 000, 001, 010, 011, 101, so the leftmost 3 bits in the 32-bit Integer are used to represent the State of Thread pool Executor.

The remaining 29 bits are used to represent the number of threads running in the Thread pool Executor. So now we have 2 operations to worry about:

  • How to club the 2 parameters into one Integer?
  • How to retrieve State and WorkerCount from one Integer?

Let's try to understand these:

COUNT_BITS = Integer.SIZE - 3; 
COUNT_BITS = 32 - 3 = 29
CAPACITY = (1 << COUNT_BITS) - 1;
i.e. 00000000000000000000000000000001 << 29 - 1
i.e. 00100000000000000000000000000000 - 1
i.e. 00011111111111111111111111111111

All the states are to be stored in leftmost 3 bits and carry values between 0 to 5, so these 0–5 values are left-shifted by 29 bits. e.g.

RUNNING   = -1 << COUNT_BITS i.e.  11100000000000000000000000000000
SHUTDOWN = 0 << COUNT_BITS i.e. 00000000000000000000000000000000
STOP = 1 << COUNT_BITS i.e. 00100000000000000000000000000000
TIDYING = 2 << COUNT_BITS i.e. 01000000000000000000000000000000
TERMINATED = 3 << COUNT_BITS i.e. 01100000000000000000000000000000

So we can see the States have only the first 3 bits set and the rest of 29 bits are 0’s, for those to be used by the number of running threads, or “WorkerThreads”. To club the State and WorkerThreads, the ctlOf method is used:

int ctlOf(int rs, int wc) { return rs | wc; }E.g. if rs = RUNNING = 11100000000000000000000000000000
And wc = 2 = 00000000000000000000000000000010
Then ctl = 11100000000000000000000000000000 | 00000000000000000000000000000010
= 11100000000000000000000000000010

As we can see in 11100000000000000000000000000010 First 3 bits = 111 = RUNNING and rest 29 bits representing 2 = worker threads. To retrieve back these values from ctl: The RunStateOf method is used:

runStateOf(int c)     { return c & ~CAPACITY; }
e.g. if c = 11100000000000000000000000000010
CAPACITY = 00011111111111111111111111111111
~CAPACITY = 11100000000000000000000000000000
c & ~CAPACITY =
11100000000000000000000000000010
& 11100000000000000000000000000000 to get
11100000000000000000000000000000 i.e. RUNNING State

Since the ~CAPACITY has right 29 bits = 0, & with ctl will make 29 bits 0 and will only return leftmost 2 bits. Similarly to get back the worker count, WorkerCountOf is used:

workerCountOf(int c)  { return c & CAPACITY; }
e.g. if c = 11100000000000000000000000000010
CAPACITY = 00011111111111111111111111111111
c & CAPACITY =
11100000000000000000000000000010
& 00011111111111111111111111111111 to get
00000000000000000000000000000010 i.e. WorkerCount=2

Since CAPACITY has all 29 bits 1, and leftmost 3 bits=0, & with ctl will only return the rightmost 29 bits value, which reflects the workerCount. So we can see how the simple bit manipulation concept is used to encapsulate 2 important properties of the ThreadPoolExecutor.

If the above theory is clear, we can move into how the code looks like in the Thread Pool Executor, and how does it honor all the above use cases. Execute(Runnable runnable) — where we submit the task to the ExecutorService.

It has 3 Scenarios covered in the 3 if blocks- If a new Thread can be created, If Task can be pushed to the Queue, or if the Task needs to be rejected.

Line 8–12: If WorkerThread count < CorePoolSize, then we need to spawn up a new Thread and hand over the task to it Directly.
We will see what addWorker does, but it basically creates a new Thread and lets it run the Runnable/task.

Line 17: If WorkerThread counts ≥ CorePoolSize, then we need to push the incoming task to the Queue if the ThreadPool is not stopped(is in running state).

Line 22–23: To handle the case where the ThreadPool was stopped after the new task was added to the queue, we recheck if ThreadPool is still Running. If not, we undo the Queue. offer() and reject the task.

Line 27–28: To handle the scenario where CorePoolSize was set to 0. In such cases, the tasks will be added to the Queue, but the thread will never be created to execute those. So we check the count of active threads. If WorkerThread count is 0, we simply create a new Thread which will read from the Queue to process the tasks present in it.

Line 32–33: If the WorkerThread count < CorePoolSize and the Queue was already full, and we were not able to add this task to the Queue, we will simply reject the task.

Add Worker Method:

AddWorker () has three functions:

  1. addWorker(Runnable firstTask, corePoolSize=true)
    Generate a thread with the firstTask and the new Thread has to be created inside the CorePoolSize boundary. It can be seen as a method of generating a core thread.
  2. addWorker(Runnable firstTask, corePoolSize=false)
    Generate a thread with the firstTask and a new Thread has to be created inside the MaximumPoolSize boundary. It can be seen as a method of producing a non-core thread.
  3. addWorker(Runnable firstTask = null, corePoolSize=false)
    Generate an idle thread (no firstTask), a new Thread has to be created inside the MaximumPoolSize boundary. Since it has null as FirstTask, it will be reading from the Queue to get the task it needs to execute.

Recall that if ThreadPool = RUNNING state, then it can take new Tasks to execute. If ThreadPool = SHUTDOWN state, then it can not take new tasks(so firstTask = null for SHUTDOWN case), but it can still execute the QUEUED tasks.

Let's try to understand this in detail:

Line 12–14: Check if the ThreadPool is in a Running state. If yes, then we can proceed ahead. If it is in SHUTDOWN state, then check if firstTask is null as in shutdown state, it can not accept new tasks, but can only execute the Queued tasks. In the Shutdown state, it also checks if the Queue is not empty. had the Queue been empty, then there is no task to execute, so there is no point in creating a new thread.

Line 18–22: Check how many WorkerThreads are currently active. We check if the WorkerThreads exceeds the Thread Capacity allowed by OS, and if those are more than the CorePoolSize and maximumPoolSize, then we cant create the new Thread, so we won't proceed forward.

Line 25–26: If none of the above conditions holds true, we increment the worker count. If the increment was done successfully we can continue to create a new thread, or else this logic executes again in the loop.

Now that we know that the new task is present(either firstTask or inside the Queue), we can continue creating the new Thread(Worker Thread).

Line 41: Create a new Worker Thread passing in the firstTask to it. Worker Thread has the logic(we will see in a later section) to handle the case where if the firstTask passed is null, it reads from the Queue to get the task to execute, else it first executes this firstTask.

Line 52–64: We acquire the lock before we access/update the worker list. But to handle the scenario where the ThreadPool might get ShutDown during the lock acquire, we recheck the conditions again.
Check if ThreadPool is in Running State, then we are good. And if it's in a Shutdown state and the firstTask was empty, then also we can create the new Thread. These are checks which were earlier done and are rechecked again.
If we can proceed ahead, we add the newly created Worker Thread to the list of workers.

Line 70–72: Once we have created the new Worker Thread, we start it immediately, so that it can start executing the firstTask or the Task present in the Queues. We will see how this WorkerThread works below.

Line 76–78: If the Worker Thread was unable to start due to any reason, we need to undo the operations done.
We will remove this worker from workerList + decrement the worker count + terminate/interrupt this Worker. This logic is handled in the addWorkerFailed method.

Worker Threads

Workers are nothing but the Runnable:

As seen in the above addWorker handling, when the Worker is created we start it. So we will see how its run method is implemented, as that will be called once the Worker is started.

Once the WorkerThread is started, it will first try to execute the firstTask provided to it. Once it has executed that, or if that is null, it will pick the new task from the Queue, and execute it. It will continue to do so until the ThreadPool is stopped, so once say we have 10 Worker Threads started, all these 10 will be polling from the Queue to pick tasks to execute.

Line 12: If the FirstTask = null, it will pick a new task from the Queue to run.

Line 25–37: It calls the beforeExecute hook method, then runs the task, and then calls the afterExecute hook method. These are useful if the client wants to do some extra logging/handling before and after the task was executed.

Line 46: To handle the ThreadPool status changes, or External errors coming while executing the tasks, the processWorkerExit is called.

Line 2–14: If this method was called due to some exception(completedAbruptly=true) then we decrement the worker count, remove this worker from the worker list, and terminate this worker Thread.

Line 17-25: If the ThreadPool is still running and if WorkerThreads are lesser than CorePoolSize and if Queue has tasks to execute, then this method will create a new WorkerThread, basically replacing the previous one which failed.

Summarizing

  1. ThreadPool Executor has the CorePoolSize which governs how many active threads spawn up, with every incoming request.
  2. Once the CorePoolSize threads are active, new incoming tasks are added to the Queue, and these threads are actively polling from the Queue to execute them.
  3. Once the Queue gets full, new threads may be created(if maximumPoolSize > CorePoolSize), to handle more load.
  4. The new threads created will be started immediately, and will either start executing the task given or by pulling from the queue.
  5. If any worker thread errors out, due to some external error while executing the task, then a new Worker Thread will be created to replace it.

--

--