Stream API

Parallel streams in a nutshell

Javid Razzagzade
7 min readDec 8, 2022

--

Stream API has one of the most powerful features— built in concurrency support. I hope you already know what the serial streams are and what they are used for. I’ll try to explain some topics simply about parallel streams below.

  1. What are parallel streams and how to create them?
  2. What is parallel decomposition?
  3. Processing parallel reductions.
  4. Avoiding stateful operations.

What are parallel streams and how to create them?

A parallel stream is a stream that is capable of processing results concurrently, using multiple threads. Thanks to it, we gain performance improvement. But in parallel streams, the result differs from serial streams. The number of threads available in a parallel stream is equal to the number of CPUs in your environment.

Creating parallel streams

There are two simple ways to create parallel streams.
The first one is using parallel() method on existing serial stream. This method is an intermediate operation that operates on the original stream.

Stream<Integer> serial = List.of(1,2,3,4).stream();
Stream<Integer> parallel = serial.parallel();

The second way is using parallelStream() method on existing collection. The Collection interface includes a method parallelStream() that can be called on any collection and returns parallel stream.

Stream<Integer> parallel = List.of(1,2,3,4).parallelStream();

boolean isParallel().
The stream interface includes a method isParallel() that can be used to check if parallelism is supported or not. Returns whether this stream would execute in parallel during a terminal operation. This method should be called before calling a terminal operation.

What is parallel decomposition?

A parallel decomposition is the process of taking a task, breaking it up into smaller pieces that can be performed concurrently, and then reassembling the results. The more concurrent a decomposition, the greater the performance improvement of parallel streams.

Let’s try it with mock example. Imagine we have a run() method that pulls some information from a database or reads a file.

static String run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {}
return "anything";
}

Now let’s call this method.

long start = System.currentTimeMillis();
List.of(1,2,3,4,5)
.stream()
.map(r -> run(r))
.forEach(s -> System.out.print(s + " "));

System.out.println("time: " + (System.currentTimeMillis()-start)/1000);
// 1 2 3 4 5
// time: 50

Results are ordered and predictable because we are using a serial stream. And the process time is about 50 seconds. What happens if we use a parallel stream? Let’s try it.

long start = System.currentTimeMillis();
List.of(1,2,3,4,5)
.parallelStream()
.map(r -> run(r))
.forEach(s -> System.out.print(s + " "));

System.out.println("time: " + (System.currentTimeMillis()-start)/1000);
// 4 5 3 1 2
// time: 10

As you can see, results aren’t ordered anymore. And execution time is quite less than previous example. Execution time depends on the count of environment’s CPUs. If your environment hasn’t enough CPUs, the result may be 20, 30, etc. The map() and forEach() operations on a parallel stream are equivalent to submitting multiple Runnable lambda expressions to a pooled thread executor and then waiting for the results.

If you want to get results as ordered, you can use forEachOrdered(), which forces a parallel stream to process the results in order. But this causes a loss of performance.

long start = System.currentTimeMillis();
List.of(5,2,3,4,1)
.parallelStream()
.map(r -> run(r))
.forEachOrdered(s -> System.out.print(s + " "));

System.out.println("time: " + (System.currentTimeMillis()-start)/1000);
// 5 2 3 4 1
// time: 10

In this code block, the forEachOrdered() forces our stream into a single-threaded process. Fortunately, our map() operation is still able to take advantage of the parallel stream and perform a parallel decomposition in 10 seconds instead of 50 seconds. I prefer to use forEachOrdered() in some operations.

Processing parallel reductions.

Reduction operations on parallel streams are referred to as parallel reductions. There are differences between result of serial stream reduction and result of parallel stream reduction. I mean result may be differ in parallel reduction.

Performing order-based tasks

As you know, order is not guaranteed in parallel streams. Methods such as findAny() may result in unexpected behaviour. For example:

System.out.print(List.of(1,2,3,4,5,6,7).stream().findAny().get());
// 1

Although it is not guaranteed, this code outputs the first value of the stream almost in every situation. The findAny() method is free to select any element on either serial or parallel streams.

System.out.print(List.of(1,2,3,4,5,6,7).parallelStream().findAny().get());
// it is not predictable

The result is not predictable because JVM can create any number of threads up to the count of available CPUs in the environment to process this stream. When you call findAny() on a parallel stream, the JVM selects the first thread to finish the task and retrieves its data.
And on the other hand, any order based operations, like findFirst(), limit(), skip() etc, may actually perform more slowly in a parallel streams. The reason for this is the threads in this parallel environment forced to process synchronized-like fashion. And as you guessed, the result of this parallel process will be the same as serial stream. For example, calling skip(5).limit(2).findFirst() will return the same result on ordered serial and parallel streams.

Unordered streams

Streams are ordered in their nature. But if you want, you can create unordered stream from ordered stream by using unordered() method.

List.of(1,2,3,4,5).stream().unordered();

This method does not reorder the elements of the stream. It just tells to JVM when you see order-based stream operation, forget about ordering. For example, if you call skip(5) on an unordered stream, it skips 5 random items, not the first 5 items. It gives us performance improvement when we use parallel streams. For serial streams, using an unordered version has no effect.

Combining Results with reduce()

I hope that you remember reduce() method. Although the one and two argument versions of reduce() do support parallel processing, it is recommended that you use the three argument version of reduce() when working with parallel streams. Providing an explicit combiner method allows the JVM to partition the operations in the stream more efficiently.
A brief reminder of its signature:

<U> U reduce(
U identity,
BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner
);

and code example:

String result = List.of('a', 'e', 'n', 'd')
.parallelStream()
.reduce(
//identity
"",
//accumulator
(s1, c) -> s1 + c,
//combiner
(s2, s3) -> s2 + s3
);
System.out.println(result);
// aend

As you guessed, this process goes on parallel. JVM divides this process between some threads. In a serial stream, ‘aend’ is built one character at a time. In a parallel stream, the intermediate values ‘ae’ and ‘nd’ are created and then combined. You can think the result may be ‘dnea’, ‘nead’ or something like that. If you worry about ordering, don’t worry. Because JVM prevents it. But you should worry about two roles. Make sure that the accumulator and combiner work regardless of the order they are called in. And you have to choose the identity parameter carefully. For example, if we add numbers, we can do so in any order. Let’s see an example that creates a problem for us.

System.out.println(
List.of(1,2,3,4,5,6,7)
.parallelStream()
.reduce(0, (x,y) -> (x - y))
);
// Accumulator doesn't follow the accumulator rule.
// It may output -16, ‐26, 2, or some other value.
System.out.println(
List.of("a","e","n","d")
.parallelStream()
.reduce("5", String::concat)
);
// Identity doesn't follow the identity rule.
// 5a5e5n5d

Combining Results with collect()

Like reduce(), collect() method shows similarity in its signature.

<R> R collect(
Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner
);

Let’s see an example.

Stream<String> stream = Stream.of("a", "e", "n", "d").parallel();
SortedSet<String> sortedSet = stream.collect(
//supplier
ConcurrentSkipListSet::new,
//accumulator
Set::add,
//combiner
Set::addAll
);
System.out.println(sortedSet);
// aend

You should use a concurrent collection to combine the results (because of parallel streams), ensuring that the results of concurrent threads do not cause a ConcurrentModificationException.

Avoiding stateful operations.

Before starting, we have to know what stateful and stateless lambda expressions are. Because side effects can appear in parallel streams if your lambda expressions are stateful.
A stateful lambda expression is one whose result depends on any state that might change during the execution of a pipeline. On the other hand, a stateless lambda expression is one whose result does not depend on any state that might change during the execution of a pipeline.
Let’s try an example:

public List<Integer> add(IntStream numbers) {
var oddNumbers = Collections.synchronizedList(new ArrayList<Integer>());
numbers
.filter(n -> n % 2 == 1)
.forEach(i -> { oddNumbers.add(i); }); // Stateful. You should avoid this.
return data;
}

and,

var oddNumbers = add(IntStream.range(1, 11));
System.out.println(oddNumbers);

//1, 3, 5, 7, 9, 11

What happens, if someone will pass parallel stream to our add method? With a parallel stream, the order of the output becomes random.

var oddNumbers = add(IntStream.range(1, 11).parallel());
System.out.println(oddNumbers);

//5, 1, 7, 11, 9, 3 or something like that.

The first thing that comes to mind is using forEachOrdered() method. But it forces our stream to work synchronized-like fashion because that forces the parallel stream to be serial. As you know, it is not recommended.

We can fix this solution by rewriting our stream operation to no longer have a stateful lambda expression.

public static List<Integer> add(IntStream numbers) {
return numbers
.filter(n -> n % 2 == 1)
.boxed()
.collect(Collectors.CopyOnWriteArrayList());
}

This method produces the same result on both serial and parallel streams.
In my opinion, you should avoid stateful lambda operations in streams whenever you use serial or parallel streams. Because you can change serial stream to parallel stream in fiture.

Thank you for reading.

--

--