Reactive Programming — A Series Of Tubes

Filip Sufitchi
6 min readOct 21, 2016

--

Recently, my job has required me to familiarize myself with the new Angular 2 framework and some associated tools such as @ngrx/store. Along with those comes the concept of reactive programming. It was not entirely intuitive to me or my co-workers, but since I feel I have a grip on it now, this post is for their benefit and for the benefit of anyone else confused about what an Observable, Observer, Subject, subscription, etc are.

So… What is reactive programming? It’s not a big truck, it’s a series of tubes!

No, I’m not kidding. Let’s consider a basic program: parsing and displaying someone’s name. Forget about the actual code to do this. Let’s talk conceptually about how data would flow through such a program.

Imagine an empty virtual tube that your data (the name you want to parse) passes through. Right now, the tube is boring and does nothing to the name. it looks like this:

If we want to parse a name, the tube should actually do something! However, tubes are tubes and don’t do anything, so instead let’s add a linkage that does stuff.

Suppose that in the process of parsing this full name, we only need the first name. We could add another linkage that does stuff to grab our array of words in the name, and return just the first one:

Look at that! We have the result we need! Let’s dive further, though. Pipes and tubes are meant to carry stuff. You don’t just dump something on them. So, what happens if we put in several values through this pipe?

Well, once we give the first pipe the “next” input value, it carries that value through the system, and spits out the new output at the end. Speaking of spitting the output out, we don’t want to dump those first names on the ground. We should feed them into a device that actually uses them to do something useful. Let’s call it a consumer.

This consumer takes whatever we give it (in this case, first names) and prints it. Remember, we’re not worried about actual code right now, so the semantics of printing are irrelevant.

Let’s go a step further: getting the last name. We could build a separate chain of pipes and processors like above, but that would be inefficient. We would be splitting the name twice, when we really only need to once. Let’s “tap” into the pipe after the split happens.

Now the single action of putting a new name in will trigger two consumers! Neat!

Let’s go one more step! Let’s use this data we extracted to get an output that looks like “J Smith”. Skipping a couple steps, our system now looks like this:

That “zip” there is a special component that we can feed pipes into, and will take values from each incoming pipe, grouping values in a one-to-one manner. For example, suppose that getting the initial of “John” is an extremely time consuming task. “Zip” receives “Smith” from the last name pipe, but it does not pass it through to the consumer until the point where “J” comes from the first letter device. That is the case even if other names come down the pipe — they just queue up. Once “J” shows up, Zip combines it with the waiting “Smith” and pushes it out to the consumer.

Speaking in a manner of processing, this is extremely useful behavior! It means that the first letter processor is asynchronous — it does not make everyone else wait for it to do its thing. The only things that wait on it are the ones that genuinely require its output (such as the Zip).

Where is this even going?!

This all does have a point, I swear! Let’s go back to the world of real code.

Angular 2 and its ecosystem are based on the ReactiveX toolkit, namely rxjs. They use different, more technical nomenclature than I did above, but many of the concepts are the same. Let’s look at some code to perform our first pipeline: printing the first name of the input full name.

let inputPipe$ = Rx.Observable.of('John Roger Smith');
let splitData$ = inputPipe.map(fullName => fullName.split(' '));
let firstName$ = splitData.map(splitName => splitName[0]);
firstName$.subscribe(name => console.log(name));

That looks a little awful, but bear with me. Rxjs has a concept called an Observable, which serves the same purpose as our tubes/pipes in the above example. Each of the variables in the code above holds one such Observable. The inputPipe$ gets initialized as an Observable that will pass one value through itself: “John Roger Smith”. The dollar sign at its end is simply a naming convention to differentiate between an observable of a value and a static value itself.

We then call map on the Observable, which takes any values passing through it and transforms them with the given function. In this case, the given function is a split around whitespace, in order to get the words in the name. The first name is extracted using another map.

Lastly, we subscribe to the firstName$ Observable, giving it a function that consumes values that pass through it. This is a “subscriber function”, and adding it to an Observable activates it and causes the whole system to start flowing.

The code probably looks a bit cumbersome. That’s because rxjs is intended to be used in a “functional” manner, chaining calls instead of storing their results in variables when possible. Let’s do that, but actually implement all of the processing in order to print “J Smith”.

let source$ = Rx.Observable.of('John Roger Smith');let splitName$ = source$
.map(name => name.split(' '));
let firstInitial$ = splitName$
.map(splitName => splitName[0])
.map(firstName => firstName[0]);
let lastName$ = splitName$
.map(splitName => splitName[splitName.length - 1]);
let initialedName$ = Rx.Observable.zip(firstInitial$, lastName$)
.map(zipArray => zipArray[0] + ' ' + zipArray[1]);
initialedName$.subscribe(name => console.log(name));

Note that we are defining what certain values are, as opposed to caring about their values. That is, the code above says: “a first initial is what you have if from a split name, you take the first word, and that word’s first initial”. It is declarative as opposed to procedural. It means that the source can be replaced with anything and values will update appropriately.

To demonstrate this, let’s use another tool in the rxjs bag of tricks: Subjects. Subjects are objects that act like Observables, but can be fed values and act as sources.

let source$ = new Rx.Subject();// Rest of code from abovesource$.next('John Roger Smith');
source$.next('Sarah A Q Flanders');
source$.next('Charlie Vanderbilt');

That should print out “J Smith”, “S Flanders”, and “C Vanderbilt”.

This ability to define data flows makes reactive programming excellent for single page applications or other long-lived processes in the browser and outside of it. It lets you declare changing values in an abstract way, avoiding a tangle of function definitions, and avoiding callback hell. Heck, in Angular 2, displaying initialedName$ is as simple as including this in your HTML template:

{{ initialedName$ | async }}

The latest value in that Observable will always appear there, without any more work required for the developer. I think that’s pretty nifty, and I hope you do too. If you would like to see it in action, check out a small demo featuring this article’s code here:

--

--