Room and RxJava — easily linked together (Source)

Room 🔗 RxJava

Doing queries in Room with RxJava

Less boilerplate code, compile-time checked SQL queries, and on top of this, the power of asynchronous and observable queries — how does that sound? All of these are possible with Room, the persistence library from the Architecture Components. Async queries return LiveData or RxJava’s Maybe, Single or Flowable. The queries returning LiveData and Flowable are observable queries. They allow you to get automatic updates whenever the data changes to make sure your UI reflects the latest values from your database. If you’re already working with RxJava 2 in your app, then using Room together with Maybe, Single and Flowable will be a breeze.

Later edit: starting with `2.0.0-beta01`, Room also supports Observable

Later edit 2: starting with Room 2.1.0-alpha01, DAO methods annotated with @Insert, @Delete or @Update support Rx return types Completable, Single<T> and Maybe<T>.

Let’s consider the following UI: the user is able to see and edit their username. This, together with other info about the user, is saved in the database. Here’s how to insert, update, delete and query the user.

Insert

The Room integration with RxJava allows the following corresponding return types for insert:

  • Completable — where onComplete is called as soon as the insertion was done
  • Single<Long> or Maybe<Long> — where the value emitted on onSuccess is the row id of the item inserted
  • Single<List<Long>> or Maybe<List<Long>> — where the value emitted on onSuccess is the list of row ids of the items inserted

In case of error inserting the data, Completable, Single and Maybe will emit the exception in onError.

@Insert
Completable insert(User user);
// or
@Insert
Maybe<Long> insert(User user);
// or
@Insert
Single<Long> insert(User[] user);
// or
@Insert
Maybe<List<Long>> insert(User[] user);
// or
@Insert
Single<List<Long>> insert(User[] user);

Use the observeOn operator to specify the Scheduler on which an Observer will observe the Observable and subscribeOn to specify the Scheduler on which the Observable will operate.

Update/Delete

The Room integration with RxJava allows the following corresponding return types for update/delete:

  • Completable — where onComplete is called as soon as the update/delete was done
  • Single<Integer> or Maybe<Integer> — where the value emitted on onSuccess is the number of rows affected by update/delete
@Update
Completable update(User user);
// or
@Update
Single<Integer> update(User user);
// or
@Update
Single<Integer> updateAll(User[] user);
// or
@Delete
Single<Integer> deleteAll(User[] user);
// or
@Delete
Single<Integer> deleteAll(User[] user);

Use the observeOn operator to specify the Scheduler on which an Observer will observe the Observable and subscribeOn to specify the Scheduler on which the Observable will operate.

Query

To get the user from the database, we could write the following query in the data access object class (UserDao):

@Query(“SELECT * FROM Users WHERE id = :userId”)
User getUserById(String userId);

This approach has two disadvantages:

  1. It is a blocking, synchronous call
  2. We need to manually call this method every time our user data is modified

Room provides the option of observing the data in the database and performing asynchronous queries with the help of RxJava Maybe, Single and Flowable objects.

If you’re worried about threads, Room keeps you at ease and ensures that observable queries are done off the main thread. It’s up to you to decide on which thread the events are emitted downstream, by setting the Scheduler in the observeOn method.

For queries that return Maybe or Single, make sure you’re calling subscribeOn with a different Scheduler than AndroidSchedulers.mainThread().

To start using Room with RxJava 2, just add the following dependencies to your build.gradle file:

// RxJava support for Room
implementation “android.arch.persistence.room:rxjava2:1.0.0-alpha5”
// Testing support
androidTestImplementation “android.arch.core:core-testing:1.0.0-alpha5”

Maybe

@Query(“SELECT * FROM Users WHERE id = :userId”)
Maybe<User> getUserById(String userId);

Here’s what happens:

  1. When there is no user in the database and the query returns no rows, Maybe will complete.
  2. When there is a user in the database, Maybe will trigger onSuccess and it will complete.
  3. If the user is updated after Maybe was completed, nothing happens.

Single

@Query(“SELECT * FROM Users WHERE id = :userId”)
Single<User> getUserById(String userId);

Here are some scenarios:

  1. When there is no user in the database and the query returns no rows, Single will trigger onError(EmptyResultSetException.class)
  2. When there is a user in the database, Single will trigger onSuccess.
  3. If the user is updated after Single was completed, nothing happens.

Flowable/Observable

@Query(“SELECT * FROM Users WHERE id = :userId”)
Flowable<User> getUserById(String userId);

Here’s how the Flowable/Observable behaves:

  1. When there is no user in the database and the query returns no rows, the Flowable will not emit, neither onNext, nor onError.
  2. When there is a user in the database, the Flowable will trigger onNext.
  3. Every time the user data is updated, the Flowable object will emit automatically, allowing you to update the UI based on the latest data.

Testing

Testing a query that returns a Maybe/Single/Flowable is not very different from testing its synchronous equivalent. In the UserDaoTest, we make sure that we use an in-memory database, since the information stored here is automatically cleared when the process is killed.

@RunWith(AndroidJUnit4.class)
public class UserDaoTest {

private UsersDatabase mDatabase;
@Before
public void initDb() throws Exception {
mDatabase = Room.inMemoryDatabaseBuilder(
InstrumentationRegistry.getContext(),
UsersDatabase.class)
// allowing main thread queries, just for testing
.allowMainThreadQueries()
.build();
}

@After
public void closeDb() throws Exception {
mDatabase.close();
}

Add the InstantTaskExecutorRule rule to your test, to make sure that Room executes all the database operations instantly.

@Rule
public InstantTaskExecutorRule instantTaskExecutorRule =
new InstantTaskExecutorRule();

Let’s implement a test that subscribes to the emissions of getUserById and checks that indeed when the user was inserted, the correct data is emitted by the Flowable.

@Test
public void insertAndGetUserById() {
// Given that we have a user in the data source
mDatabase.userDao().insertUser(USER);
    // When subscribing to the emissions of user
mDatabase.userDao()
.getUserById(USER.getId())
.test()
// assertValue asserts that there was only one emission
.assertValue(new Predicate<User>() {
@Override
public boolean test(User user) throws Exception {
// The emitted user is the expected one
return user.getId().equals(USER.getId()) &&
user.getUserName().equals(USER.getUserName());
}
});
}

That’s it! If you use RxJava 2 in your app, make your database reactive too and ensure that your UI always shows the latest data. Check out a sample app using Room and RxJava here.