Reactive vs Proactive Programming with RxJS

Ben Copeland
LeanTaaS Engineering
3 min readMar 25, 2020

If we are using ReactiveX libraries such as RxJS, we know that we are technically implementing a reactive programming solution — we use RxJS’s concept of observable streams, and our code reacts to new events in the stream. Yet even if we use a reactive library, it is possible to design our software according to more of a proactive paradigm rather than a reactive one. However, we are much better served by these libraries if we have a reactive frame of mind.

Let’s take a moment to identify the distinction between a proactive and reactive approach. By proactive I am referring to an approach in which the publisher (or subject) is aware of the external subscribers and their desired states, and pushes updates to the known subscribers. The coupling with the consumer happens on the publisher side. By reactive I am referring to an approach in which the subscriber receives an update and handles it on its own.

Consider a recent scenario we encountered on IQueue for Clinics. We have an Angular app that heavily uses RxJS and we wanted to publish each change that comes up in a component’s ngOnChanges lifecycle hook to a stream that subscribers could consume.

Here’s a simplified example, in which we have a component with three @Input properties, and we have some components that want to keep track of an object that has the most recent value of each of the three properties.

interface IChanges { a: string; b: string; c: string; }

@Component({...})
class SomeComponent {
@Input() a;
@Input() b;
@Input() c;

changes$: BehaviorSubject<IChanges> = new BehaviorSubject({
a: null,
b: null,
c: null,
});

allLatestChanges: IChanges;

ngOnChanges(changes: SimpleChanges) {
if (changes.a) {
changes$.next(Object.assign(allLatestChanges, { a: changes.a.currentValue });
}
if (changes.b) {
changes$.next(Object.assign(allLatestChanges, { b: changes.b.currentValue });
}
if (changes.c) {
changes$.next(Object.assign(allLatestChanges, { c: changes.c.currentValue });
}
}
}

In the example above, we create an object with each of the three possible properties for caching, then on ngOnChanges, we merge updates from any of the three properties into that object, and we update the stream with the latest value of the combined object. This is a proactive approach — the subject knows the intent of the observer. If another observer is added which has a different intent, or if the intention of the current observer changes, the implementation of the subject must be changed.

The approach above is essentially what we started with, then we went on to refactor it to make it more reactive. For that to happen, we needed for the subject to simply publish the latest value without any knowledge of consumers. The following is an updated version of the earlier component which only publishes current updates.

@Component({...})
class SomeComponent {
@Input() a;
@Input() b;
@Input() c;

changes$: Subject<SimpleChanges> = new Subject();

ngOnChanges(changes: SimpleChanges) {
changes$.next(changes);
}
}

And then we handle the aggregation into a single object in the class that consumes the observable.

defaultAcc = {
a: null,
b: null,
c: null,
}

allChanges$ = changes$
.pipe(
scan((acc, curr) =>
Object.assign(acc, Object.fromEntries(
Object.entries(curr).map((k,v) => { [k]: v.currentValue })
)
)), defaultAcc
);

Now, the observable is only publishing new events, and the subscriber is harnessing one of RxJS’s many powerful pipes called scan to keep an accumulated state of the latest values of all possible properties that are coming through the stream. Think of scan as a reduce function that outputs the latest accumulated value on every event. So, after the refactoring, we have a general purpose subject which dispatches every possible ngOnChanges event. If more @Inputs are added, it will naturally dispatch those too. Any subscribers that are listening to the stream can do what they see fit with the event stream — whether that’s accumulating an object of all of the events, or only listening to changes on a single property.

While RxJS gives us the tools to do reactive programming, it is still possible to design code that follows a more proactive paradigm. To be more reactive, we can keep our publishing as agnostic as possible and allow or subscriptions to implement more complicated logic.

--

--

Ben Copeland
LeanTaaS Engineering

Software engineer, gamer, amateur game developer, haiku writer, entrepreneur.