RxJS Observable interop with Promises and Async-Await

Every now and then I’m asked a question about how to use async functions or promises with RxJS, or worse, I’m told some “fact” about how async-await and Observables don’t really “work together”. RxJS has, from inception, had a high degree of interoperability with Promises. Hopefully this article will shed some light on that.

If it accepts Observable, it accepts Promise

For example, if you’re using a switchMap, you can return a Promise inside of there just the same as you can return an Observable. All of these are valid:

Make a Promise-returning function retryable with defer

If you have access to the function that creates the promise, you can wrap it with Observable.defer() and make an Observable that can be retried on error.

Define an Observable with async-await using defer()

Defer is a very powerful little tool, it turns out. You can use it, basically directly, with async functions and it will make an Observable that emits the returned value and completes.

Subscribe to an Observable with forEach to create concurrent tasks in async-await

This is a lesser-used feature of RxJS that comes from the TC39 Observable proposal. There is more than one way to subscribe to an Observable! There’s subscribe, which is the classic way to subscribe to an Observable, and it returns a Subscription object which can be used for cancelling the data stream… and there’s forEach which is a non-cancellable way of subscribing to an Observable that takes one function for each next-ed value, and returns a Promise that embodies the completion and error paths for the Observable.

Use toPromise() with async/await to emit the last Observable value as a Promise

The toPromise function is actually a bit tricky, as it’s not really an “operator”, rather it’s an RxJS-specific means of subscribing to an Observable and wrap it in a promise. The promise will resolve to the last emitted value of the Observable once the Observable completes. That means that if the Observable emits the value “hi” then waits 10 seconds before it completes, the returned promise will wait 10 seconds before resolving “hi”. If the Observable never completes, then the Promise never resolves.

NOTE: using toPromise() is an antipattern except in cases where you’re dealing with an API that expects a Promise, such as async-await

Observables and Promises work well together

Admittedly, if you’re aiming for reactive programming, most of the time, you probably want an Observable, but RxJS tries to be as ergonomic as possible in a world where Promises are so popular. Furthermore, using RxJS Observables with forEach in async functions enables a lot of interesting possibilities for managing concurrency and tasks in a way that “just works” with async-await.

Come learn more about RxJS from me in-person or online at http://rxworkshop.com!

Show your support

Clapping shows how much you appreciated Ben Lesh’s story.