Introduction to Observable

David Tomczyk
17 min readMar 31, 2018

--

For the past few months I’ve been learning and using Reactive Programming at work, specifically through implementing redux-observable middleware in the chrome extension we are building. If I’m being honest, it’s been a difficult journey trying to wrap my head around this new programming paradigm, and I definitely still have a ways to go. However, as I’ve gotten more and more comfortable with Reactive Programming and RxJS (the javascript implementation) and its core concept/type — the Observable — I figured it was a great opportunity for me to write a new blog post.

Given how broad the topic of Reactive Programming is, I think it's important to first define the goals for this post. Firstly, I won’t attempt to explain what Reactive Programming is or why its useful. If you’re interested in that, I would recommend checking out ReactiveX.io, this great blog post, or good ol’ wikipedia if you’re feeling particularly academic. What I will try to do is explain some of the critical concepts and data types of Reactive Programming, specifically the Observable, Observer, Subscription, Operator, and Subject.

One of my main motivations for writing this blog is something that I’ve felt and which has been called out in other resources for Reactive programming: there just aren’t a lot of good materials out there for learning it. Official documentation is often too advanced, technical, or dense to serve as a good introduction and a lot of the tutorials only skim the surface or don’t provide quite enough background to make you feel comfortable with what’s going on. Now, that isn’t to say there is nothing out there worth checking out. In fact, I’ve found — and will lean on in this post — quite a few fantastic resources, such as:

So with that being said I hope that this blog serves as a good beginner introduction into the core data types and concepts of Reactive Programming. If you walk away from this with enough knowledge to start understanding some of the official documentation or comfortable following along with a tutorial you’ve found, I will have done my job. If not, please let me know how I can improve this in the comments!

Observable

As mentioned above, Observable is the core data type of Reactive Programming. In my experience, once you understand what an Observable is (and isn’t) and what it does, everything else starts to fall into place. Before we dive into Observable though, it might be helpful to align on a definition of Reactive Programming, which should set the stage for how Observable fits into it. If you’ve started looking into Reactive Programming already, you’ve probably encountered many definitions of what it is, each probably slightly more confusing that the last. The most approachable definition I’ve found so far comes from Andre Staltz, and is as follows:

Reactive programming is programming with asynchronous data streams.

As Andre notes, many of us are already familiar with asynchronous data streams: think click events/event listeners. Reactive programming takes this concept and applies it to everything. And it does that primarily through the Observable.

So, what is an Observable? Given the definition of Reactive Programming provided above, its tempting to think of an Observable as a stream. Some resources you find might even compare them to promises. While those may be helpful, and are certainly understandable given some of the conceptual similarities, I think they end up doing more harm than good in that they overcomplicate what an Observable is. I’ve become partial to Ben Lesh’s definition of an Observable, which is:

‘A function which takes an observer and returns a function’

For now we’ll ignore the observer part (we’ll get to that soon), and just focus on the idea that an Observable is function.

As you may have guessed, its a bit of an oversimplification, but one that has helped me dispel some of the magic that I first attributed to Observables and thus made them more approachable. To provide a bit more context, have a look at the following chart:

Source: ReactiveX.io

As explained in the table’s source, Observables are ‘ lazy Push collections of multiple values’. In this way they are similar to Promises: both are data producers which provide (push) data to consumers at their own discretion. The consumer does not control when they receive the data. This is in contrast to the pull model, in which a data consumer (ie the code calling a function) dictates when it receives data from the producer (ie the function). The producer is unaware of when data will be requested(pulled) by the consumer.

However, Observables differ from Promises in critical ways. As noted in the chart above, Promises eventually return only a single value. Observables can return 0 to infinity values. The value returned by a Promise is always the same no matter which callback receives it. However an Observable, just like a function, returns different values each time it is called. Two function calls will return two separate side effects and two Observable calls trigger two separate side effects.

Another key distinction is that Promises are always asynchronous: code execution will continue until the Promise resolves, at which point the callback(s) provided to them will be executed. Observables can be either synchronous or asynchronous, depending on what’s going on inside of them. We’ll see this in action soon.

Finally, Promises can be thought of as ‘hot’ — they try to resolve as soon as they are created, whether or not you call then()on them or not. Observables on the other hand are ‘cold’ — they don’t evaluate anything until you call them. Until that happens, Observables just sit there, hence why they are called lazy. Now — and this will be confusing — not all Observables are cold. They can be forced into being hot, such as if they are created from a Promise. Subjects, which inherit from Observable and which we will get to later as well, are also hot. However, lets ignore those for now. All the examples in the following section will deal with cold Observables.

Our First Observable

Now that we’ve gotten some background on what an Observable is and how it works, lets jump in and create one:

On the line 3 we’re using Observable.create(), which is simply an alias for the Observable constructor. It takes one argument: a subscribe function which itself has an observer argument. The code inside this subscribe function represents the Observable execution: a lazy computation that only happens when the Observable is called. The execution provides one or more more values over time, and can happen either synchronously or asynchronously, as mentioned in the previous section. The code above will demonstrate this: lines 4–6 will happen synchronously, while the setTimeout on line 7 will happen asynchronously.

Observable executions can produce 3 types of values:

  • next() — delivers a value (ie number, string, object, etc.)
  • error() — sends a JavaScript Error or exception
  • complete() — does not send a value, only signals completion of the Observable

We can see next() and complete() above. As a best practice we should wrap any code in a try/catch block that will deliver an error notification if an exception is caught. Let’s do that now:

As you might guess, next() notifications are the most important and most common type since they represent data being delivered to an Observer. In an Observable execution, zero to infinite next() notifications may be delivered. On the other hand, error() and complete() notifications may happen only once during the Observable execution, and there can only be either one of them. If either an error() or complete() notification is delivered, then nothing else can be delivered afterwards.

And that’s it! Our first Observable. Now let’s do something with it. Remember — Observables are lazy. The one we created above will just sit there, unexecuted, until we call it. We do this by calling subscribe() on it.

Subscribing

If subscribe() sounds familiar, that’s because it is: we provide a subscribe() function to the Observable constructor or its alias create(). While that may be confusing, it is no coincidence! While they are different in the Reactive library, they are conceptually equal and should be considered as such for practical purposes.

The idea is that this helps to highlight how subscribe() calls to an Observable — which cause it to execute — are not shared: When calling observable.subscribe()the function subscribe() supplied to Observable.create() is run for that given call to observable.subscribe(). In other words: each call to observable.subscribe() triggers its own independent Observable execution.

For simplicity sake, I’ve left out the key part of subscribing to an Observable: the Observer. Let’s dive into that now.

Observer

We’ve established that Observables are ‘ lazy Push collections of multiple values’, which produce 3 types of values (next, error, and complete) and which can be executed by calling subscribe() on them. So now you may be wondering how we get actually those values. Enter the Observer — a collection of callbacks that listen to values delivered by an Observable. Again, if some of what follows looks familiar, you’re not imagining things:

As you might recall, we’ve already called these functions inside the Observable execution when we created our Observable. When we subscribe with an Observer we are simply defining what each one will actually do upon execution.

While our Observer above includes a callback for each type of value produced by an Observable, we can also provide a partial Observer (ie only next and complete, or only next). Finally, we can also provide these callbacks as separate arguments to observable.subscribe(). The first will be used for next(), the second for error(), and the last for complete(). This is why you may sometimes see:

Putting It All Together

We’ve now seen all the key parts of defining, executing, and getting values from an Observable. Let’s put them all together and analyze what happens.

Lines 1–23 should look familiar, as they are the same as what we’ve discussed in the previous sections, with the addition of a console.log() inside the Observable execution to highlight when that begins.

We can see that the first output is the console.log() from line 25, demonstrating that our observable is in fact lazy and has not yet executed at that point. Once subscribe() is called, the console.log() inside the Observer execution is called and our observer.next() methods log the numbers 1–3 provided to it by observable. All of this is done synchronously.

We then see how an Observable can be executed asynchronously if the code inside of its execution is asynchronous, such as the setTimeout() on line 10. While that timeout waits to complete, we get the console.log() from line 27, and after 1000ms has passed, we finally get our last next() value and complete() from lines 11 and 12. Et voila, our first observable has successfully been executed!

Disposing Observable Executions

If we circle back to Ben Lesh’s definition of an observable — ‘a function which takes an observer and returns a function’ — you might notice that our example and breakdown above describes only the first two-thirds of it. What about that whole ‘returns a function’ thing? As you might notice, we actually do capture the return value of observable.subscribe() as a variable called subscription. So what’s the point of that?

Because an Observable can return 0 to infinity values, the Observable API provides a way to abort execution during some finite amount of time. When subscribe() is called on an Observable, a Subscription object is returned. This Subscription represents the ongoing Observable execution and allows us to cancel that execution by calling the Subscription’s unsubscribe() method.

Let’s take a look at what would happen if we called subscription.unsubscribe() in our example above:

As we can see, when we call unsubscribe() on line 30 the observable execution is canceled before the setTimeout() inside observable completes and has the opportunity to deliver the final value of 4.

Each Observable should define how to dispose of the resources for an execution when it is created. For our simple Observable above, the API’s default unsubscribe() function was enough. However, for more complex Observables, you can and should return a custom unsubscribe() function from within the subscribe() function provided to Observable.create(). Below is an example of how that might look.

Observable.create()’s subscribe() function returns a custom unsubscribe() function which contains the logic to clear the countInterval set up within the Observable execution. If this was not the case, and we instead relied on the default unsubscribe() function, observable would stop delivering values after 3 seconds, just as what happens above. However, the program would never terminate because the interval continues to tick!

That brings us to the end of our introduction to Observable, the foundation of Reactive programming. If you’ve followed up to this point, you’re in great shape. Although this is really only the tip of the iceberg in terms of Reactive programming, everything relies on a solid understanding of Observables. The more comfortable you are with them, the easier it will be to pick up the rest of what makes Reactive programming so powerful.

Perhaps the most powerful aspect of Reactive programming are its operators. So without further ado, let's jump into exactly what those are and how we can use them to take our use of Observables to the next level.

Operators

The definition of Operators you’ll find in ReactiveX’s RxJS Manual is:

‘Operators are the essential pieces [of Reactive programming] that allow complex asynchronous code to be easily composed in a declarative manner’

Sounds pretty fancy, but unfortunately touches more on the ‘why’ than the ‘what’. I think it's important to include here though, since it will help explain some of the features and behaviors of Operators as we move forward.

So what are Operators then? Simply put, they are functions which take a source Observable, create a new Observable based on the source Observable, and return the new Observable. To put it another way: an Operator is a pure function that takes one Observable as an input and generates another, new Observable as an output. If you take nothing else from this section, let this be it!

Given that definition, what might an operator look like? Let’s dive into some code and find out. In order to streamline things a bit, I’ve introduced a new way to create an Observable in the example below — don’t worry if it doesn’t quite make sense, I’ll explain it in the following section.

Firstly, as promised, lets address the fancy new way we created the Observable called input on line 17. Rather than using Observable.create() and passing a subscribe() function that imperatively defines what values the Observable will deliver, I’ve instead used Observer.from() which allows us to declaratively define an Observer’s output values by passing it an array (or another data type that can be converted into an Observable by the from() method, such as a string). If that sounds eerily similar to the first ‘why’ definition of Operators given above, you’re on to something. As it turns out, Observable.create() and Observable.from() are both operators!

Specifically, they are Static Operators —pure functions attached to the Observable class. Most Static Operators, just like create() and from(), are used to create new Observables and thus are called Creation Operators. You can find a full list of them, along with their documentation, here.

On to our new Operator, square(). As we can see, it accepts one parameter, input. Inside of the function, we use Observable.create() to make a new Observable called output which is returned at the end of the square() function, thus fulfilling our contract of being pure. The real magic of this Operator, and any other, happens inside of the subscribe() function provided to Observable.create().

On line 7, we subscribe to the input Observable and provide it with an Observer object with a callback for next, error, and complete, just as we did in our first example Observable. The magic here is that each callback now also calls the observer provided from the new output Observable, from line 5. Thus, each time the input Observable emits a next value, so too does the output Observable! Given that we want this operator to square the values provided to it by an input Observable, we multiply the value provided to the next callback on line 8 by itself within the new observer.next() call. Since we don’t want to do anything fancy with error or complete notifications, we simply pass those along unchanged on lines 9 and 10.

Now that we’ve covered the basics of an operator, let’s see it in action. Notice that when we subscribe() to just the observable input, it delivers the values 1, 2, 3, 4, 5 to the simple observer we provided to it on line 18. On line 20, we create a new Observable, output, which is returned from our square() operator which takes input as its parameter. Recall that output is now a completely new Observable, since square() is a pure function. When we subscribe to output, we can see that we first receive 1, then 4, then 9, etc. All squares of the values delivered by input!

Notice that the console.log() on line 6, which indicates the start of the Observable execution for square()‘s output Observable, does not occur until after we subscribe() to output on line 22, proven by the fact that we get ‘subscribing to output’ from from line 21 first. This illustrates a key concept: we do not subscribe to an input Observable until we subscribe to the output Observable. Thus, if we ‘chain’ multiple operators together, no execution takes place until we we actually subscribe to the final Observable output by that chain.

Put another way: subscribing to the output Observable also subscribes to the input Observable. This is commonly referred to as an ‘operator subscription chain’, and is one of the most powerful parts of Reactive Programming. Common Operators, such as filter and map (which behave similarly to the Javascript functions they take their namesakes from), can be chained together, one after the other, to create and compose new Observables declaratively.

Now, you might be thinking: we can’t truly ‘chain’ an Operator such as square(), given that it’s just a function — and you would be correct! Most often, when we refer to Operators, we actually mean nstance Operators, which are simply methods on an instance of Observable. The filter and map operators referenced above are both Instance Operators.

The principles behind Instance Operators are just like those in the square() operator above, but the code looks just a bit different. To illustrate that, let’s create our own implementation of the map Operator:

As you can see, rather than taking an input (or source) Observable as a parameter, we can simply use this, since that will refer to whichever Observable the Operator is being called on. The reason I’ve switched to source here is to more closely match the terminology you’ll find in official documentation or source code. Other than that though, everything else is the same: we create a new output Observable, subscribe to the source Observable within its Observable execution, and finally return it at the end of the method. Because this is a map operator, we take a function as a parameter that will be applied to each value delivered by the source Observable before it is delivered by the output Observable’s next() method.

As mentioned before, Instance Operators allow us to create powerful subscription chains that allow us to easily manipulate Observables:

In just 4 lines of code, we can create an Observable that delivers values from 0 to 9, filters for only those that are even, appends a string to each value, and then appends an exclamation point to those values. This is of course a silly little example, but hopefully you can appreciate how powerful this can be when applied to real world problems. Especially given the huge number of operators included in RxJS.

This just about completes my introduction to Observable. However, there is one final concept we need to cover to consider this a complete introduction.

Up to now, we’ve only covered the Observable, which as mentioned before creates a new execution each time it is subscribed() to, just like a function. While this is convenient for keeping our Observables isolated, it does leave them somewhat lacking. What if we want to subscribe a few different Observers to the same exact Observable execution? Due to Observable’s unicast nature, this is sadly not possible without the use of some fancy Operators like share() which I admittedly have not yet even begun to explore.

Luckily for me (and you!), there is another core data type in Reactive Programming that is multicast — that is, it can deliver values to multiple subscribed Observers at the same time. It is: the Subject.

Subjects

Subjects in Reactive Programming inherit from Observable, thus they are really just ‘special’ Observables that allow values to be multicasted to one or more Observers. In this way they are very similar to EventEmitters: both maintain a registry of all their listeners.

Just like Observables, Subjects are subscribed() to with an Observer, which will then start receiving values delivered by the Subject. Observers behave the exact same whether they are subscribed to an Observable or a Subject, and in fact have no ‘knowledge’ of whether the Observable execution it receives values from is from an Observable or a Subject.

Unlike Observables however, subscribing to a Subject does not invoke a new execution, as it does for an Observable. Instead, it simply registers the Observer to a list of Observers, just like addListener() behaves for an event in JavaScript. When the Subject is ready to deliver a value, error, or complete notification, it simply iterates over its list of Observers and supplies the value/notification to each one.

So how do Subjects deliver values if subscribing doesn’t cause a new execution? Interestingly, every Subject is also an Observermeaning that it is an object with next(), error(), and complete() methods. In order to provide a new value or notification to a Subject, and therefore its Observers, simply call Subject.next(value) and it will be delivered to each Observer registered to ithence the term multicast.

Let’s see what that looks like:

By now this should all look pretty familiar. The only difference is that we only create a new subject on line 3, we don’t provide it a subscribe function or any code within an Observable execution. In addition, the subscribe() calls on lines 6 and 11 don’t produce any output. Instead, our Observers only call their next() methods when subject.next(), is called. When that happens, they are both called!

Perhaps the most powerful aspect of Subjects is that because they are also Observers, they can be provided to the subscribe() function of any Observable. This means that whenever that Observable delivers a value or notification, any Observer subscribed to the Subject will also receive the value or notification provided by the Observable:

Every time observable delivers a value, it will call subject.next() because it was supplied as the Observer to subscribe(). This results in each Observer registered to subject to have their next() methods called as well. As a result, observable is now effectively multicast!

One final key distinction to draw between Observables and Subjects is that Subjects cannot be reused, unlike Observables. As mentioned previously, when an Observable errors or completes, its Observable execution ends and no further values can be delivered from it. However, because Observables execute anew each time they are subscribed to, the Observable can still execute again if subscribe() is called on it again. In contrast, when a Subject errors or completes — that’s it. It can no longer be used. Calling next() on a Subject that has errored or completed —and is therefore considered closed — will do nothing. The Subject will silently ignore it. Ben Lesh provides an example of this behavior, and how to deal with it, in this fantastic blog post on Subjects. For convenience, I’ve provided some code inspired by his example below:

Notice that the calls to next() on lines 7 and 8 behave as expected. However, when subject.complete() is called on line 9, the subsequent subject.next() is ignored with no notification or error of any kind. Only once we explicitly unsubscribe() from the subject, does a call to next() throw an error.

Conclusion

That completes our introduction to the core concept/data type of Reactive Programming, the Observable. In the process, we’ve not only covered Observable, but also Subscribe, Observers, unsubscribing, Operators, and Subjects. While this really only scratches the surface of all of these topics, it should at least provide you with the knowledge you’ll need to understand some of the official documentation and follow along with other articles on these topics. Thus far I’ve really enjoyed Reactive Programming, in spite of the pains of learning it, and I hope this post makes it just a little bit less painful for you! It is not only a unique shift in the approach to programming, but also a very powerful tool when used correctly.

--

--

David Tomczyk

Software Engineer at Upnext, Flatiron School graduate, technophile.