Observables based on resources and the “using” method

Often we need to create an Observable that uses a resource to produce the items it emits. We are using the term resource here, in the same sense that it is used in the term try-with-resources:

A resource is an object that must be closed after the program is finished with it.

In the Rx-world, “finished with it” might refer to a number of things:

  • the Observable completing successfully
  • the Observable completing with an error
  • the Observable being disposed by the downstream.

An example of an Observable that is based on a resource could be an Observable that emits the lines of a text file as Strings. In order to create such an Observable, we would read the file using aBufferedReader and then we would emit the items as shown in the snippet below:

Of course we would need to close the reader when we’re done with it. One way to tackle this would be to follow the principle that the responsibility for closing a resource lies with the party that opened it in the first place. In our case, that would be the caller of the above function, that created the reader and passed it to us.

But the caller should be take care to close the reader in all 3 cases discussed above (successful completion, error or disposal). Luckily, the Observable API offers us a method that can cover all these scenarios:

doFinally(Action onFinally)
Calls the specified action after this Observable signals onError or onCompleted or gets disposed by the downstream.

A call to doFinally is effectively equivalent to a combination of doOncomplete, doOnError and doOnDispose.

But what if we wanted to close the reader automatically? The Rx library does offer us a way to achieve this, by means of the Observable.using method.

Constructs an ObservableSource that creates a dependent resource object which is disposed of when the downstream calls dispose().
resourceSupplier - the factory function to create a resource object that depends on the ObservableSource
sourceSupplier - the factory function to create an ObservableSource
disposer - the function that will dispose of the resource
Returns:the ObservableSource whose lifetime controls the lifetime of the dependent resource object

In our case, applying this method to achieve the auto-closing of the reader would look like this:

A subtle point to consider now that we are supporting auto-closing is that inside the simple method fromBufferedReader it is very important to do the check to !e.isDisposed and to do it before the reader.readLine call (line 10 in the first gist above). If the Observable is disposed of by the downstream, we need to abort before reading the next line, because the reader will be closed.

Show your support

Clapping shows how much you appreciated Aris Papadopoulos’s story.