Coroutines and RxJava — An Asynchronicity Comparison (Part 6): Threading

Manuel Vivo
Capital One Tech
Published in
8 min readJul 17, 2018

--

Introduction

In this blog series I will compare Kotlin Coroutines and RxJava since they are both trying to solve a common problem in Android development: Asynchronous Programming.

We’ve gone through a lot of different topics as part of this series: asynchronous programming, cancelling execution, transferring stream of values, the interop library and operators.

For the latest topic we will compare is Threading!

Slow Rendering

In Android apps, if you want to have a responsive UI, you will need to control in what threads your code is executed.

You don’t want your app to skip frames and give a bad user experience. This is called Slow rendering and usually happens when you have a lot of processing going on in the Android UI thread.

All UI operations must run on the UI thread (e.g. setting the text of a TextView) but other operations don’t. That’s why we want to reduce the load on the UI thread (for a smooth rendering) and run non-UI operations in background threads.

Note that if you’re concerned that you’re doing too much work on the UI thread, there are different ways of profiling your app with Android Studio.

Threading in RxJava

Threading in RxJava is controlled by two operators: subscribeOn() and observeOn().

In a nutshell, subscribeOn specifies the Scheduler on which an Observable will execute its .create method and observeOn specifies the Scheduler on which an observer will consume the items emitted by the Observable.

A Scheduler is a tool that schedules actions to be executed. There are some pre-defined Schedulers, but you can also create your own one passing an ExecutorService (which can contain your own ThreadPool).

Let’s see a piece of RxJava code. We’ll make use of these operators while transforming the element we send with a Single.zip operator.

RxJava code to study

Let’s break all of this down. You might have noticed the subscribeOn() operator.

subscribeOn defines on which Scheduler the Observable will execute its create method

It defines on which Scheduler the Single.create will be executed on. In this case, Schedulers.computation().

Note that all RxJava operators that create Observables (i.e. Single.just, Single.zip, etc) internally use Single.create.

The emission of the item will happen on Schedulers.computation()

After emitting that item, we’ll make transformations to that object. To perform them in different threads, we have to observe the items on different Schedulers. We achieve that with the observeOn() operator.

Note that we use the observeOn operator before applying the transformation and subscribeOn after creating the Observable

Usage of observeOn to execute transformations/observe items in different threads

The first transformation consists of emitting the square of that number and we want that to happen on Schedulers.io().

Note that this is not something that you would do in your app. I’m just swapping to different Schedulers to show how we can do this and how threading works in RxJava.

Transforming the item on Schedulers.io()

As you can see, we’re calling observeOn(Schedulers.io()) to decide on which Scheduler we want to consume the item. Then, we use the flatMap operator to return a Single that emits the square of the number we received.

Note that a map operator would have also worked here to transform the items. I’m using flatMap to show the power of threading in RxJava.

The next transformation will emit the number we receive (the square of the original number that we got from the previous transformation) minus 1. Since we swap to Schedulers.computation() again, the transformation will be executed on that Scheduler.

Transforming the item on Schedulers.computation()

Now we want to consume the item. Since we will hypothetically make some UI changes, we want that to happen in the Android Main Thread. As before, we use the observeOn() operator to pass in AndroidSchedulers.mainThread().

AndroidSchedulers.mainThread() is an Android-specific Scheduler part of RxAndroid.

Consuming the item in the Android Main Thread

In the example, we’re printing to console the item we consumed. Since that’s happening in the Android UI Thread, we could have also modified any UI widget on the screen.

At this point, I think we now know enough about threading in RxJava to compare it with threading in Coroutines.

Threading in Coroutines

Threading in Coroutines is defined in the Coroutine Context. In previous articles, we said that the Coroutine Context is a set of user-defined objects. Threading is defined in the value for the key ContinuationInterceptor.

Doing something like coroutineContext[ContinuationInterceptor] = threadingPolicy will change the threading policy for that particular coroutine.

That value is of type CoroutineDispatcher. You can have a CoroutineDispatcher with a single thread or a thread pool. As with RxJava threading, Coroutines come with some pre-defined values that you can use out of the box. Some of them are:

  • CommonPool. This is similar to Schedulers.computation() in RxJava. The small difference is that computation() is a pool of size number of CPUs and CommonPool is a pool of size number of CPUs minus 1 (which means for dual core devices, you only get 1 thread) as Nick Capurso calls out in his post.
  • UI (Android). Dispatches execution on Android main UI thread.
  • Unconfined. Be careful and read the documentation if you want to use this feature. If the Coroutine goes through a suspension point, it might resume on a different CoroutineDispatcher.

Schedulers.computation is a pool of size = CPUs and CommonPool is a pool of size = CPUs — 1

Creating Your Own CoroutineDispatcher

You can use newSingleThreadContext to create a CoroutineDispatcher with a single thread.

val coroutineDispatcher = newSingleThreadContext("ThreadName")

And you can also use newFixedThreadPoolContext to create a CoroutineDispatcher with a fixed-size thread pool.

val coroutineDispatcher = newFixedThreadPoolContext(4, "PoolName")

That will create a CoroutineDispatcher with a thread pool made of four threads.

Threading in Practice

Now it’s time to take our RxJava example and go through a similar example using Coroutines.

Let’s take a look at the following Coroutine:

Coroutine code to study

As you can see, in the first line we start a Coroutine with the CoroutineBuilder launch and we pass CommonPool as the CoroutineContext. That’s overriding the ContinuationInterceptor value of the coroutine we just created. That will make our Coroutine to run in the background.

The code in that Coroutine will be executed in background threads.

CommonPool is the default value for the ContinuationInterceptor key in the CoroutineContext.

Using the CoroutineContext of the parent coroutine

In the following lines we’re creating two other coroutines to imitate what we did with the Single.zip in the RxJava code. Then, we just wait for both coroutines to finish, sum the numbers they return and assign it to the result variable.

Where is the code of the new two coroutines executed on? As you can see, we’re passing coroutineContext as a parameter to the async CoroutineBuilder. coroutineContext is a variable that every coroutine has and refers to the context of the parent coroutine, i.e. the context of the coroutine we created with launch in the first line. Therefore, the async coroutines effectively use the CoroutineContext of the parent coroutine: CommonPool.

The code will be executed in the CommonPool since we use the parent CoroutineContext

All that code is going to be executed in the CommonPool, since that’s the ContinuationInterceptor value of the main coroutine.

Use of newSingleThreadContext

What’s next? We want to apply the transformations just like we did in the RxJava code. We’re creating another Coroutine to transform the value of the variable result. For that, we use the CoroutineBuilder launch and we call .join() so the parent coroutine waits for this coroutine to complete.

As you can see, we’re using newSingleThreadContext, that will create a new thread to execute the code inside this coroutine.

The coroutine will be executed in a new thread we just created to run it

The code inside that Coroutine will be executed in a new thread called CustomThread.

Another way of swapping threads with Coroutines

For the last example we use withContext. If we hypothetically wanted to run the code in the Android UI main thread to make some UI changes, we would have to use the UI CoroutineDispatcher.

withContext is a suspending function that will run the suspending code inside it on a different context. The parent coroutine will wait for this to return before continuing execution.

The code is executed in the context defined as a parameter

The code in that suspending function will be executed in the Android UI thread.

withContext vs launch.join

When do you have to use withContext vs launch{}.join()? In both cases, the parent coroutine calling these methods will wait for them to finish. However, launch will create a new coroutine and withContext won’t. launch will add the extra cost of creating a new coroutine.

At this point, I’d make my decision based on readability. If the task to perform is related to the coroutine, I’d use withContext. If it’s unrelated, launch.

Conclusion

This series is coming to an end and this is the last comparison we’ll do on a specific topic. We have seen how both libraries offer a solution for every aspect of asynchronous programming.

A few closing points I’d like to make:

  • There isn’t a winner/loser between RxJava and Coroutines. Use what fits your project and what you like most. This may vary and will be unique to you and your project.
  • Both libraries provide a good way to do asynchronous programming.
  • If you are a RxJava expert, no need to switch. You can continue what you’re doing.
  • If you struggle with RxJava or are a new Android developer, Coroutines is another option you can try. Coroutines is supposed to have a lower learning curve.

More Education

Articles about RxJava 2

Previous parts of this series

Thanks for reading,

Manuel Vicente Vivo

--

--