How it works in java. Thread Pool.

Sergey Kuptsov
5 min readMar 3, 2017

--

The main principle of programming says not to reinvent the wheel. But sometimes in order to understand what is going on and how not to misuse instrument we need to do this. Today reinventing multithread execution pattern.

Imagine, you a have some high cpu intensive task like this:

public class Counter {

public Double count(double a) {
for (int i = 0; i < 1000000; i++) {
a = a + Math.tan(a);
}

return a;
}
}

We want to process number of such tasks as fast as possible, let’s try*:

public class SingleThreadClient {

public static void main(String[] args) {
Counter counter = new Counter();

long start = System.nanoTime();

double value = 0;
for (int i = 0; i < 400; i++) {
value += counter.count(i);
}

System.out.println(format("Executed by %d s, value : %f",
(System.nanoTime() - start) / (1000_000_000),
value));
}
}

On my mac with 4 physical cores cpu resource utilization with “top -pid {pid}” is:

and execution time is 104 sec.

As you can see the CPU usage per java process by one running thread is 100% but overall system cpu load in user space is only 2.5 % and we have a lot of unused system resources.

Let’s try utilizing more by adding more working threads:

public class MultithreadClient {

public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(8);
Counter counter = new Counter();

long start = System.nanoTime();

List<Future<Double>> futures = new ArrayList<>();
for (int i = 0; i < 400; i++) {
final int j = i;
futures.add(
CompletableFuture.supplyAsync(
() -> counter.count(j),
threadPool
));
}

double value = 0;
for (Future<Double> future : futures) {
value += future.get();
}

System.out.println(format("Executed by %d s, value : %f",
(System.nanoTime() - start) / (1000_000_000),
value));

threadPool.shutdown();
}
}

Resource utilization is:

Which is much better, on 8 running threads we have almost 100% cpu utilization and total execution time reduced from 104 to 15 sec.

ThreadPoolExecutor

To speed up we used ThreadPool — in java its role plays ThreadPoolExecutor which can be instantiated directly or from one of methods in utility class Executors. If we look inside ThreadPoolExecutor we can find queue

private final BlockingQueue<Runnable> workQueue;

where tasks are collected if more than initial pool size threads are running. If fewer than initial pool size threads are running , try to start a new thread.

public void execute(Runnable command) {
...
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
...
if (isRunning(c) && workQueue.offer(command)) {
...
addWorker(null, false);
...
}
}

Each addWorker starts a new thread with Runnable task that polls workQueue and executes tasks.

final void runWorker(Worker w) {
...
try {
while (task != null || (task = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)) != null) {
...
task.run();
...
}

ThreadPoolExecutor has a very clear javadocs so it is not reasonable to paraphrase it’s functionality. Instead of it, let’s try to make our own:

public class ThreadPool implements Executor {
private final Queue<Runnable> workQueue = new ConcurrentLinkedQueue<>();
private volatile boolean isRunning = true;

public ThreadPool(int nThreads) {
for (int i = 0; i < nThreads; i++) {
new Thread(new TaskWorker()).start();
}
}

@Override
public void execute(Runnable command) {
if (isRunning) {
workQueue.offer(command);
}
}

public void shutdown() {
isRunning = false;
}

private final class TaskWorker implements Runnable {

@Override
public void run() {
while (isRunning) {
Runnable nextTask = workQueue.poll();
if (nextTask != null) {
nextTask.run();
}
}
}
}
}

Now let’s execute the same task as above with our pool.

// Using our pool - change the line in MultithreadClient:
// ExecutorService threadPool = Executors.newFixedThreadPool(8);
ThreadPool threadPool = new ThreadPool(8);

Execution time is practically the same — 15 sec.

Thread pool size

Let’s try to increase the number of running threads in pool even more — to 100.

ThreadPool threadPool = new ThreadPool(100);

We can see that execution time degraded to 28 sec — why this happened?

There are a number of independent reasons that performance falls off with more threads:

  • Context switches. The processor is interrupted from working on one query and has to switch to another, which involves saving state and restoring state. While the core is busy swapping states it is not doing any useful work on any query. Context switches are much cheaper than they used to be with modern CPUs and system call interfaces but are still far from free.
  • Cache line contention. One query is likely to be working on a particular area of RAM, and the query taking its place is likely to be working on a different area; causing data cached on the CPU chip to be discarded, only to need to be reloaded to continue the other query. Besides that the various processes will be grabbing control of cache lines from each other, causing stalls.

The number of context switches you can observe by looking at csw param in top.

On 8 threads :

On 100 threads :

How to choose pool size?

The size depends on the type of executing tasks. Needless to say, thread pool size should rarely be hard-coded, but configurable and optimal size is taken from throughput monitoring.

Assuming that threads do not block each other, there is no I/O wait cycles and processing time of tasks is the same, the optimum thread pool is Runtime.getRuntime().availableProcessors() + 1. Even compute-intensive threads occasionally take a page fault or pause.

If threads are mostly waiting for I/O optimal pool size need to be increased by ratio between process wait time and compute time. E.g. we have process that spends 50% of time in iowait, then pool size can be 2 * Runtime.getRuntime().availableProcessors() + 1.

Other kinds of pools

ScheduledThreadPool — that can schedule commands to run after a given delay. ScheduledThreadPoolExecutor extends ThreadPoolExecutor declaring DelayedWorkQueue as workQueue which organized internally as heap based on array

private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];

Tasks are sorted by sequenceNumber field.

ScheduledFutureTask(Runnable r, V result, long ns) {
this.sequenceNumber = sequencer.getAndIncrement();
}

In DelayedWorkQueue method take() checks the head of the queue and returns value if getDelay() method of ScheduledFutureTask return value <= 0, otherwise block requesting thread.

class ScheduledFutureTask { 
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
}

More interesting java not built-in executors:

  1. Netty’s memory aware thread pool, which blocks the task submission when there’s too many tasks in the queue http://netty.io/3.5/api/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.html
  2. Cassandra’s server thread pool which has slightly improvements over basic thread pool and registers JMX bean to monitor and tune pool size on flight. http://www.grepcode.com/file/repo1.maven.org/maven2/org.apache.cassandra/cassandra-all/2.0.13/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java?av=h

Sources can be found here: https://github.com/kuptservol/how-it-works/tree/master/src/main/java/ru/skuptsov/thread/pool

*- test is not precise as it could be — for demonstrativeness — for more accurate tests use http://openjdk.java.net/projects/code-tools/jmh/

--

--