Room and RxJava — easily linked together (Source)

Room 🔗 RxJava

Doing queries in Room with RxJava

Less boilerplate code, compile-time checked SQL queries, and on top of this, the power of asynchronous and observable queries — how does that sound? All of these are possible with Room, the persistence library from the Architecture Components. Async queries return LiveData or RxJava’s Maybe, Single or Flowable. The queries returning LiveData and Flowable are observable queries. They allow you to get automatic updates whenever the data changes to make sure your UI reflects the latest values from your database. If you’re already working with RxJava 2 in your app, then using Room together with Maybe, Single and Flowable will be a breeze.

Let’s consider the following UI: the user is able to see and edit their username. This, together with other info about the user, is saved in the database.

To get the user from the database, we could write the following query in the data access object class (UserDao):

@Query(“SELECT * FROM Users WHERE id = :userId”)
User getUserById(String userId);

This approach has two disadvantages:

  1. It is a blocking, synchronous call
  2. We need to manually call this method every time our user data is modified

Room provides the option of observing the data in the database and performing asynchronous queries with the help of RxJava Maybe, Single and Flowable objects.

If you’re worried about threads, Room keeps you at ease and ensures that observable queries are done off the main thread. It’s up to you to decide on which thread the events are emitted downstream, by setting the Scheduler in the observeOn method.

For queries that return Maybe or Single, make sure you’re calling subscribeOn with a different Scheduler than AndroidSchedulers.mainThread().

To start using Room with RxJava 2, just add the following dependencies to your build.gradle file:

// RxJava support for Room
implementation “”
// Testing support
androidTestImplementation “android.arch.core:core-testing:1.0.0-alpha5”


@Query(“SELECT * FROM Users WHERE id = :userId”)
Maybe<User> getUserById(String userId);

Here’s what happens:

  1. When there is no user in the database and the query returns no rows, Maybe will complete.
  2. When there is a user in the database, Maybe will trigger onSuccess and it will complete.
  3. If the user is updated after Maybe was completed, nothing happens.


@Query(“SELECT * FROM Users WHERE id = :userId”)
Single<User> getUserById(String userId);

Here are some scenarios:

  1. When there is no user in the database and the query returns no rows, Single will trigger onError(EmptyResultSetException.class)
  2. When there is a user in the database, Single will trigger onSuccess.
  3. If the user is updated after Single.onComplete was called, nothing happens, since the stream was completed.


@Query(“SELECT * FROM Users WHERE id = :userId”)
Flowable<User> getUserById(String userId);

Here’s how the Flowable behaves:

  1. When there is no user in the database and the query returns no rows, the Flowable will not emit, neither onNext, nor onError.
  2. When there is a user in the database, the Flowable will trigger onNext.
  3. Every time the user data is updated, the Flowable object will emit automatically, allowing you to update the UI based on the latest data.


Testing a query that returns a Maybe/Single/Flowable is not very different from testing its synchronous equivalent. In the UserDaoTest, we make sure that we use an in-memory database, since the information stored here is automatically cleared when the process is killed.

public class UserDaoTest {

private UsersDatabase mDatabase;
public void initDb() throws Exception {
mDatabase = Room.inMemoryDatabaseBuilder(
// allowing main thread queries, just for testing

public void closeDb() throws Exception {

Add the InstantTaskExecutorRule rule to your test, to make sure that Room executes all the database operations instantly.

public InstantTaskExecutorRule instantTaskExecutorRule =
new InstantTaskExecutorRule();

Let’s implement a test that subscribes to the emissions of getUserById and checks that indeed when the user was inserted, the correct data is emitted by the Flowable.

public void insertAndGetUserById() {
// Given that we have a user in the data source
    // When subscribing to the emissions of user
// assertValue asserts that there was only one emission
.assertValue(new Predicate<User>() {
public boolean test(User user) throws Exception {
// The emitted user is the expected one
return user.getId().equals(USER.getId()) &&

That’s it! If you use RxJava 2 in your app, make your database reactive too and ensure that your UI always shows the latest data. Check out a sample app using Room and RxJava here.