A simple Observable implementation

Federico Knüssel
Aug 6, 2017 · 6 min read
{
next(value) {
console.log(value);
},
error(err) {
console.error(err);
},
complete() {
console.info('done');
}
}

A synchronous Observable example: Rx.Observable.from

const numbers$ = Rx.Observable.from([0, 1, 2, 3, 4]);numbers$.subscribe(
(value) => console.log(value),
(err) => console.error(err),
() => console.info('done')
);
function Observable(subscribe) {
this.subscribe = subscribe;
}
Observable.from = (values) => {
return new Observable((observer) => {
values.forEach((value) => observer.next(value));
observer.complete(); return ({
unsubscribe() {
console.log('unsubscribbed');
}
});
});
}
const observer = {
next(value) {
console.log(value);
},
error(err) {
console.error(err);
},
complete() {
console.info('done');
}
};
const numbers$ = Observable.from([0, 1, 2, 3, 4]);const subscription = numbers$.subscribe(observer);setTimeout(subscription.unsubscribe, 500);

A flaw in our implementation

Observable.from = (values) => {
return new Observable((observer) => {
values.forEach((value) => observer.next(value));
observer.complete(); observer.next('still emitting');

return ({
unsubscribe() {
console.log('unsubscribed');
}
});
});
}
class Observer {
constructor(handlers) {
this.handlers = handlers; // next, error and complete logic
this.isUnsubscribed = false;
}

next(value) {
if (this.handlers.next && !this.isUnsubscribed) {
this.handlers.next(value);
}
}

error(error) {
if (!this.isUnsubscribed) {
if (this.handlers.error) {
this.handlers.error(error);
}

this.unsubscribe();
}
}

complete() {
if (!this.isUnsubscribed) {
if (this.handlers.complete) {
this.handlers.complete();
}

this.unsubscribe();
}
}

unsubscribe() {
this.isUnsubscribed = true;

if (this._unsubscribe) {
this._unsubscribe();
}
}
}
class Observable {
constructor(subscribe) {
this._subscribe = subscribe;
}

subscribe(obs) {
const observer = new Observer(obs);

observer._unsubscribe = this._subscribe(observer);

return ({
unsubscribe() {
observer.unsubscribe();
}
});
}
}
Observable.from = (values) => {
return new Observable((observer) => {
values.forEach((value) => observer.next(value));

observer.complete();

return () => {
console.log('Observable.from: unsubscribed');
};
});
}
const numbers$ = Observable.from([0, 1, 2, 3, 4]);
const subscription = numbers$.subscribe({
next(value) { console.log(value); },
error(err) { console.error(err); },
complete() { console.info('done'); }
});
setTimeout(subscription.unsubscribe, 500);

An asynchronous Observable example: Rx.Observable.interval

const interval$ = Rx.Observable.interval(1000);interval$.subscribe({
next(value) { console.log(value); },
error(error) { console.error(error); },
complete() { console.info('done'); }
});
Observable.interval = (interval) => {
return new Observable((observer) => {
let i = 0;
const id = setInterval(() => {
observer.next(i++);
}, interval);

return () => {
clearInterval(id);
console.log('Observable.interval: unsubscribbed');
};
});
}
const observer = {
next(value) { console.log(value); },
error(err) { console.error(err); },
complete() { console.info('done'); }
};
const interval$ = Observable.interval(100);
const subscription = interval$.subscribe(observer);
setTimeout(subscription.unsubscribe, 1000);

Observing (asynchronous) DOM Events: Rx.Observable.fromEvent

const button = document.getElementById('btn');
const clicks$ = Rx.Observable.fromEvent(button, 'click');
clicks$.subscribe({
next(value) { console.log('clicked'); },
error(error) { console.error(error); },
complete() { console.info('done'); }
});
Observable.fromEvent = (element, eventName) => {
return new Observable((observer) => {
const eventHandler = (event) => observer.next(event);

element.addEventListener(eventName, eventHandler, false);

return () => {
element.removeEventListener(eventName, eventHandler, false);
console.log('Observable.fromEvent: unsubscribbed');
};
});
};
const clicks$ = Observable.fromEvent(button, 'click');
const subscription = clicks$.subscribe({
next(value) { console.log('clicked'); },
error(err) { console.error(err); },
complete() { console.info('done'); }
});
setTimeout(subscription.unsubscribe, 1500);

Operators

Observable.prototype.map = function (transformation) {
const stream = this;

return new Observable((observer) => {
const subscription = stream.subscribe({
next: (value) => observer.next(transformation(value)),
error: (err) => observer.error(err),
complete: () => observer.complete()
});

return subscription.unsubscribe;
});
};

Credits

Federico Knüssel

Written by

Front End Developer 👨‍💻 🇦🇺

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade