Java Streams and Lambdas — Part 2

In the first part we saw how to create streams, apply operations on them and replace their anonymous inner classes with lambdas. Let’s look at more applications of streams and lambdas.

Coding to method signatures instead of object types :

Let’s look at filtering even numbers again.

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
numbers.stream()
.filter(number -> number % 2 == 0)
.forEach(number -> System.out.print(number));

We know filter takes a Predicate object, but that is not what we provided. We provided an anonymous function. Let’s see if we can replace that function with a different function. For this, I created a EvenChecker class which has a method checkEven which will tell us if the number is even or not.

public class EvenChecker {
public boolean checkEven(int number) {
return number % 2 == 0;
}
}

Look at the signature of the method checkEven. It takes an integer and returns a boolean. Which is the same as the test method of a Predicate<Integer> object. So we can replace the Predicate with an EvenChecker instance.

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
numbers.stream()
.filter(number -> new EvenChecker.checkEven(number))
.forEach(number -> System.out.print(number));
//Output : 246

We don’t care about the exact object type, what we care about is the functionality of the method. If you noticed, EvenChecker does not implement the Predicate interface or is connected to Predicate in any way except for the matching signature of it’s test method.

Higher-order functions :

Functions which accept functions as parameters are called higher-order functions. To my Object oriented brain, it was perplexing to grasp this. How can a function be passed as an argument? Where exactly would this be beneficial? I started looking at my code closely, then I saw places where I could extract functions if only I could pass in a function as a parameter instead of duplicating code.

At work, we get data on active and inactive users and we are supposed to handle each case differently. Let’s look at a few solutions.

The first one is to iterate over each user, check if they are active or inactive and deal with those appropriately.

List<User> users = ... // List of users
for(User user : users) {
if(user.active) {
//doSomething with active users
} else {
//doSomething with inactive users
}
}

This can be improved slightly by delegating the decision making further down.

List<User> users = ... // List of users
for(User user : users) {
doSomethingWithUser(user)
}
private void doSomethingWithUser(User user) {
if(user.active) {
//doSomething with active users
} else {
//doSomething with inactive users
}
}

This solution is still not good because we are changing the state. We like immutability in our programs. Let’s look at a solution using streams.

List<User> users = ... // List of users
users.stream()
.filter(user -> user.active)
.forEach(doSomethingWithActiveUser);
users.stream()
.filter(user -> !user.active)
.forEach(doSomethingWithInactiveUser);

Now we have immutability, but we also have code duplication. We can fix that. Let’s look at what is common and what is not. The List<User>, filter, forEach are common. We know that a filter takes a Predicate, forEach takes a Consumer. A Consumer takes an input and returns nothing (it consumes the input). So let’s put that in a new method.

private void doSomethingWithUsers(List<User> users, Predicate<User> predicate, Consumer<User> consumer) {
users.stream()
.filter(predicate)
.forEach(consumer);

How do we use this method? We know Predicate takes an input and returns a boolean. So let’s pass that as the second parameter. And a Consumer consumes the input.

List<User> users = ... // List of users
//Method call for active users
doSomethingWithUsers(users, user -> user.active, Analytics::doSomethingWithActiveUser);
doSomethingWithUsers(users, user -> !user.active, Analytics::doSomethingWithInactiveUser); 

The doSomethingWithActiveUser(User user) and doSomethingWithInactiveUser(User user) are helper methods in the Analytics class. Since the next element of the stream and the input parameter of the methods match, we can use method references.

Parallel Streams:

This is the last topic we’re gonna look at today.

I’d like to get 10 users from Github and get their first repo name. The task is simple enough, hit the Github API for users (https://api.github.com/users) and then iterate through each of the first 10 users and get their first repo name.

I have a simple User class which has one property reposUrl which provides me with the URL to fetch all the repos of the user.

We’ll look at an imperative solution and a functional solution.

List<User> users = getUsers();
long startTime = System.nanoTime();
for(User user : users) {
String repos = apiRequest(user.getReposUrl());
System.out.println(getFirstRepoName(repos));
}
long endTime = System.nanoTime();
System.out.println("Imperative : " + (endTime - startTime) / 1e9);

The solution is quite familiar. We loop through each user, get their first repo name and print it out. We get the time take in nano seconds and convert that to seconds. Lets see the output.

30daysoflaptops.github.io
ace
auto_migrations
abbot-from-scratch
acl_system2
adventure_cats
ace
amqp
afero
active_admin
Imperative : 98.182267084

Wow, 85 seconds. Looks like Github servers are under a lot of load today. When I ran the same code yesterday I was getting times of around 5 seconds.

Now, let’s look at a functional solution using streams.

startTime = System.nanoTime();
users.stream()
.map(user -> apiRequest(user.getReposUrl()))
.map(ParallelStreams::getFirstRepoName)
.forEach(System.out::println);

endTime = System.nanoTime();
System.out.println("Functional : " + (endTime - startTime) / 1e9);

We use the internal iterator, we get the list of repos for every user and then we get the first repo name from that list. Finally we print it all out and profile the time taken. We get the same output and similar time.

30daysoflaptops.github.io
ace
auto_migrations
abbot-from-scratch
acl_system2
adventure_cats
ace
amqp
afero
active_admin
Functional : 108.874898578

If we want to parallelise the imperetive code, we’ll have to do the parallelization ourselves. We’ll have to spawn threads, manage them, make sure they are synchronized etc.

Let’s see how we can parallelize streams in Java.

users.parallelStream()
.map(user -> apiRequest(user.getReposUrl()))
.map(ParallelStreams::getFirstRepoName)
.forEach(System.out::println);

That’s it. Change the type of stream to a parallelStream and you’re all set.

Let’s look at the output generated using parallel streams.

auto_migrations
acl_system2
ace
afero
ace
active_admin
abbot-from-scratch
30daysoflaptops.github.io
adventure_cats
amqp
Functional : 38.71021781

The order of elements has changed and so has the time taken. That’s because the elements were processed in parallel.

The times I got when everything was running fast was :

Imperative code : 5.2 seconds

Functional (not parallel) : 5.3 seconds

Functional (parallel) : 2.5 seconds.

That’s it for my intro to Streams and Lambdas.

You can find code samples on my Github :

You can leave improvements on my code or any other feedback in the comments.

Thanks,

AJ