A simple Observable implementation

{
next(value) {
console.log(value);
},
error(err) {
console.error(err);
},
complete() {
console.info('done');
}
}
const numbers$ = Rx.Observable.from([0, 1, 2, 3, 4]);numbers$.subscribe(
(value) => console.log(value),
(err) => console.error(err),
() => console.info('done')
);
function Observable(subscribe) {
this.subscribe = subscribe;
}
Observable.from = (values) => {
return new Observable((observer) => {
values.forEach((value) => observer.next(value));
observer.complete(); return ({
unsubscribe() {
console.log('unsubscribbed');
}
});
});
}
const observer = {
next(value) {
console.log(value);
},
error(err) {
console.error(err);
},
complete() {
console.info('done');
}
};
const numbers$ = Observable.from([0, 1, 2, 3, 4]);const subscription = numbers$.subscribe(observer);setTimeout(subscription.unsubscribe, 500);
Observable.from = (values) => {
return new Observable((observer) => {
values.forEach((value) => observer.next(value));
observer.complete(); observer.next('still emitting');

return ({
unsubscribe() {
console.log('unsubscribed');
}
});
});
}
class Observer {
constructor(handlers) {
this.handlers = handlers; // next, error and complete logic
this.isUnsubscribed = false;
}

next(value) {
if (this.handlers.next && !this.isUnsubscribed) {
this.handlers.next(value);
}
}

error(error) {
if (!this.isUnsubscribed) {
if (this.handlers.error) {
this.handlers.error(error);
}

this.unsubscribe();
}
}

complete() {
if (!this.isUnsubscribed) {
if (this.handlers.complete) {
this.handlers.complete();
}

this.unsubscribe();
}
}

unsubscribe() {
this.isUnsubscribed = true;

if (this._unsubscribe) {
this._unsubscribe();
}
}
}
class Observable {
constructor(subscribe) {
this._subscribe = subscribe;
}

subscribe(obs) {
const observer = new Observer(obs);

observer._unsubscribe = this._subscribe(observer);

return ({
unsubscribe() {
observer.unsubscribe();
}
});
}
}
Observable.from = (values) => {
return new Observable((observer) => {
values.forEach((value) => observer.next(value));

observer.complete();

return () => {
console.log('Observable.from: unsubscribed');
};
});
}
const numbers$ = Observable.from([0, 1, 2, 3, 4]);
const subscription = numbers$.subscribe({
next(value) { console.log(value); },
error(err) { console.error(err); },
complete() { console.info('done'); }
});
setTimeout(subscription.unsubscribe, 500);
const interval$ = Rx.Observable.interval(1000);interval$.subscribe({
next(value) { console.log(value); },
error(error) { console.error(error); },
complete() { console.info('done'); }
});
Observable.interval = (interval) => {
return new Observable((observer) => {
let i = 0;
const id = setInterval(() => {
observer.next(i++);
}, interval);

return () => {
clearInterval(id);
console.log('Observable.interval: unsubscribbed');
};
});
}
const observer = {
next(value) { console.log(value); },
error(err) { console.error(err); },
complete() { console.info('done'); }
};
const interval$ = Observable.interval(100);
const subscription = interval$.subscribe(observer);
setTimeout(subscription.unsubscribe, 1000);
const button = document.getElementById('btn');
const clicks$ = Rx.Observable.fromEvent(button, 'click');
clicks$.subscribe({
next(value) { console.log('clicked'); },
error(error) { console.error(error); },
complete() { console.info('done'); }
});
Observable.fromEvent = (element, eventName) => {
return new Observable((observer) => {
const eventHandler = (event) => observer.next(event);

element.addEventListener(eventName, eventHandler, false);

return () => {
element.removeEventListener(eventName, eventHandler, false);
console.log('Observable.fromEvent: unsubscribbed');
};
});
};
const clicks$ = Observable.fromEvent(button, 'click');
const subscription = clicks$.subscribe({
next(value) { console.log('clicked'); },
error(err) { console.error(err); },
complete() { console.info('done'); }
});
setTimeout(subscription.unsubscribe, 1500);
Observable.prototype.map = function (transformation) {
const stream = this;

return new Observable((observer) => {
const subscription = stream.subscribe({
next: (value) => observer.next(transformation(value)),
error: (err) => observer.error(err),
complete: () => observer.complete()
});

return subscription.unsubscribe;
});
};

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store