RxJS: Don’t Unsubscribe
Ben Lesh
4K39

I found takeUntil() useful in tests to make sure the streams have completed so that different tests don’t interfere with each other.

For example, if you have streams listening to global events (like keyboard events), it’s quite easy to accidentally write tests so that the streams from previous tests are still active.

Without takeUntil() the following test set would fail. The “yields B” test would fail because the subscriber in “yields A” test is still listening to the keyup events.

const stopTest$ = new Rx.Subject();
afterEach(() => {
stopTest$.next();
});
const uppercaseKey = () => {
return Rx.Observable.fromEvent(document, 'keyup')
.pluck('key')
.map(key => key.toUpperCase())
};

it('yields A', (done) => {
uppercaseKey()
.do(value => expect(value).toBe('A'))
.takeUntil(stopTest$)
.subscribe(done, done.fail);
document.dispatchEvent(new KeyboardEvent('keyup', { key: 'a' }));
});

it('yields B', (done) => {
uppercaseKey()
.do(value => expect(value).toBe('B'))
.takeUntil(stopTest$)
.subscribe(done, done.fail);
document.dispatchEvent(new KeyboardEvent('keyup', { key: 'b' }));
});