Почему вам НАДО отписываться от Observable?

Higher order observables, subscribes and unsubscribes

Посмотрите на следующий пример и ответьте для себя на вопрос: Здесь нужно отписаться от подписки… или нет?
class TokenInterceptor implements HttpInterceptor {
intercept(
request: HttpRequest<any>,
next: HttpHandler
): Observable<HttpEvent<any>> {
return interval(1000).pipe(
map(() => next.handle(request))
);
}
}

const subscription = http.get('https://.../api/get')
/** Did you unsubscribe? */
.subscribe(() => subscription.unsubscribe());

Существует множество статей на тему отписок в RxJS, но некоторые вещи в них не освещаются либо имеются некоторые заблуждения. Важно понимать, что любой код без отписки от Observable, это потенциальная возможность получить утечку памяти и регрессию в производительности. Поэтому давайте пройдемся по нескольким примерам и наконец-то разберемся, когда нужно отписываться, а когда не нужно.


Не делайте подписки

Самый простой способ отписываться от Observable, это просто не делать подписки. Да! Это весь способ. Вам просто не нужно делать подписки. Но, вы возможно скажете, а как же я получу все необходимые мне данные? Все очень просто, если вы используете Angular, то достаточно просто передать свой Observable в AsyncPipe. Например, это может выглядеть так:

<div *ngFor=”source | async”></div>
<app [data]="source | async"></div>
А еще так {{ source | async }}

Все, вы не сделали подписку, значит вам не нужна отписка! Ура!

С получением данных разобрались, но как же правильно работать с данными?


Работа с данными

При работе с данными могут возникнуть ситуации, когда у вас не получится просто взять AsyncPipe и начать использовать его для подписки. И для того, чтобы раскрыть тему, давайте выделим несколько основных групп:
— получение данных при изменении входных параметров;
— многопоточное получение данных из различных источников;
— отправка последовательных запросов;
— создание, удаление и обновление данных.


Простой запрос с подпиской и отпиской

Давайте рассмотрим ситуацию, когда у нас есть некий Http запрос, и нам необходимо получить данные с сервера. Условно он может выглядеть так:

const request = http.get('https://.../api/get');

Отлично, когда у нас есть запрос, мы можем на него подписаться. Давайте это и сделаем:

request.subscribe((response) => console.log(response));

Теперь, когда мы подписались, то совершили очень важную ошибку. Мы забыли отписаться от подписки! Но, вы спросите, а зачем? Мы же получаем данные и http сам завершит поток, когда придут данные.


Популярные ошибки при работе с подписками

Смотрите, предположим, что мы не просто получаем данные, а записываем их в параметры компонента:

request.subscribe((response) => this.response = response);

В таком случае, может произойти следующее. Например, вы сделали запрос, но когда ответ еще не пришел с бекенда, вы уничтожите компонент за ненадобностью, то ваша подписка будет удерживать ссылку на компонент, тем самым создавая потенциальную возможность утечки памяти.

Или вот еще пример, когда запускается другая подписка или асинхронное действие внутри подписки:

request.subscribe((response) =>
// Запускается таймер у компонента через setInterval
setInterval(() => ..., 1000)
);
request.subscribe((response) =>
// Запускается таймер у компонента через interval
interval(1000).subscribe(() => ...)
);

А если вы все еще не хотите отписываться от Http, то посмотрите еще раз на этот пример:

class TokenInterceptor implements HttpInterceptor {
intercept(
request: HttpRequest<any>,
next: HttpHandler
): Observable<HttpEvent<any>> {
return interval(1000).pipe(
map(() => next.handle(request))
);
}
}

const subscription = http.get('https://.../api/get')
/** Did you unsubscribe? */
.subscribe(() => subscription.unsubscribe());

В данной ситуации, если не сделать отписку, то поток никогда не завершится. Вы точно уверены, что не столкнетесь с подобной ситуацией?

В реальной разработке ничто не гарантирует вам, что подобного никогда не произойдёт. И как итог, вы получите реальную утечку памяти в вашем приложении. Запустите несколько таких подписок, и ваша память не будет очищаться, а браузер будет заниматься ненужной и бесполезной работой, и в конечном счете упадет с ошибкой.


Операторы take, first, takeWhile…

НЕ РАБОТАЮТ! У вас все еще нет гарантий, что в поток точно придут данные, а ваш код не будет иметь побочные действия, которые будут удерживать в памяти ссылки на ваши компоненты. Или создадут ошибку при работе с сущностями, которые уже были уничтожены при уничтожении компонента.

Например, у вас нет гарантий что в поток сделается emit. Или то что, обработчик события не упадет с ошибкой из-за того, что ваш View уже уничтожен.

Если вам что-то не нужно, то просто уничтожьте это и дайте этому умереть.

И на эту тему есть отличный доклад. Если у вас еще остались сомнения, то просто посмотрите его:


Сделал подписку, сделай и отписку

Для того, чтобы отписаться от подписки можно использовать простой метод unsubscribe():

subscription: SubscriptionLike;
ngOnInit() {
this.subscription = request.subscribe(...);
}
ngOnDestroy() {
if (this.subscription) {
this.subscription.unsubscribe();
this.subscription = null;
}
}

Все, теперь мы точно уверены в том, что наша подписка всегда получит отписку и мы никогда не получим утечку памяти! И обратите внимание, что нам необходимо создать подписку при инициализации компонента, а при уничтожении компонента проверить наличие созданной подписки, отписаться, и затереть на нее ссылки. Вам не кажется это… многословным? Посмотрите вот на это:

ngOnInit() {
this.subscription1 = request1.subscribe(...);
this.subscription2 = request2.subscribe(...);
// ...
this.subscriptionX = requestX.subscribe(...);
}
ngOnDestroy() {
if (this.subscription1) {
this.subscription1.unsubscribe();
this.subscription1 = null;
}
if (this.subscription2) {
this.subscription2.unsubscribe();
this.subscription2 = null;
}
// ...
if (this.subscriptionX) {
this.subscriptionX.unsubscribe();
this.subscriptionX = null;
}
}

Это выглядит ужасно! Давайте попробуем упростить работу с отписками.


Сохранение всех подписок в список

Самый простой способ отписаться от всех подписок: это сохранить их в список и просто пройтись по нему. Давайте так и сделаем:

subscriptions: SubscriptionLike[] = [];
ngOnInit() {
this.subscriptions.push(request.subscribe(...));
this.subscriptions.push(request.subscribe(...));
this.subscriptions.push(request.subscribe(...));
}
ngOnDestroy() {
this.subscriptions.forEach(
(subscription) => subscription.unsubscribe());
this.subscriptions = [];
}

Окей, ладно, мы уменьшили количество операций и кода стало меньше, но его все равно много! Давайте попробуем другой способ.


Использование Subscription add

У каждой подписки можно добавить teardown effect, который добавляется с помощью метода subscription.add() и отрабатывает тогда, когда подписка уничтожается. Давайте попробуем это использовать:

subscriptions: Subscription = new Subscription();
ngOnInit() {
this.subscriptions.add(request.subscribe(...));
this.subscriptions.add(request.subscribe(...));
this.subscriptions.add(request.subscribe(...));
}
ngOnDestroy() {
this.subscriptions.unsubscribe();
}

Теперь у нас есть одна подписка, с которой мы работаем. В нее записываются все наши подписки, и на уничтожении компонента делается отписка от всех подписок. Но нам по прежнему приходится контролировать подписки! Давайте попробуем еще один способ.


Использование takeUntil

Оператор takeUntil работает следующим образом. Он подписывается на источник данных, и в него передается некий Observable, который сообщает когда сделать отписку.

И будем использовать ReplaySubject, так как он сделает эмит последнего сообщения на случай, если подписка будет создана после уничтожения компонента.

Вот и все, давайте попробуем его использовать. Самый простой вариант может выглядеть следующим образом:

destroy: Observable<any> = new ReplaySubject<any>(1);
ngOnInit() {
request.pipe(takeUntil(this.destroy)).subscribe();
request.pipe(takeUntil(this.destroy)).subscribe();
request.pipe(takeUntil(this.destroy)).subscribe();
}
ngOnDestroy() {
this.destroy.next(null);
this.destroy.complete();

}

Нам все еще надо контролировать destroy! Есть несколько вариантов решения проблемы. Давайте их рассмотрим.


Использование декоратора AutoUnsubscribe

Для решения этой проблемы мы воспользуемся декоратором AutoUnsubscribe из библиотеки https://github.com/NetanelBasal/ngx-auto-unsubscribe. И вот как будет выглядеть наш пример:

@AutoUnsubscribe()
@Component({ selector: '...', template: '...' })
class ExampleComponent {
subscriptions: Subscription = new Subscription();
  ngOnInit() {
this.subscriptions.add(request.subscribe(...));
this.subscriptions.add(request.subscribe(...));
this.subscriptions.add(request.subscribe(...));
}
  ngOnDestroy() {}
}

Частично он решил нашу проблему, и автоматически сделает отписку от всех подписок, которые есть у компонента. В данном случае this.subscriptions, на который навесили все остальные подписки. И нам так же необходимо создавать бесполезный и пустой хук ngOnDestroy().

Попробуем использовать другое решение.


Использование оператора untilDestroyed

Оператор untilDestroyed() реализован тем же разработчиком, вот ссылка на пакет https://github.com/NetanelBasal/ngx-take-until-destroy. Работает он так же, как и takeUntil(), только на вход принимает ссылку на компонент, а не Observable. И вот как выглядит пример с использованием этого оператора:

ngOnInit() {
request.pipe(untilDestroyed(this)).subscribe();
request.pipe(untilDestroyed(this)).subscribe();
request.pipe(untilDestroyed(this)).subscribe();
}
ngOnDestroy() {}

Стало гораздо лучше! Но нам по прежнему необходимо создавать хук ngOnDestroy()! Может есть еще способы?


Использование сервиса NgOnDestroy

Идея заключается в том, что у всех сервисов работает хук ngOnDestroy(), и мы можем использовать это для автоматической отписки. Пример, как может выглядеть такой сервис, вы можете посмотреть здесь: https://stackblitz.com/edit/angular-auto-unsubscribe-service. И попробуем использовать этот сервис:

@Component({
selector: '...',
template: '...',
providers: [ NgOnDestroy ]
})
class ExampleComponent {
constructor(@Self() private destroy: NgOnDestroy) {}
  ngOnInit() {
request.pipe(takeUntil(this.destroy)).subscribe();
request.pipe(takeUntil(this.destroy)).subscribe();
request.pipe(takeUntil(this.destroy)).subscribe();
}
}

Теперь у нас нет необходимости создавать лишний хук ngOnDestroy(), но теперь появляется необходимость добавлять сервисы в провайдеры каждого компонента, где используется NgOnDestroy, и так же получать его через DI в constructor.

И это все основные способы как можно правильно отписаться от подписки. Вам не кажется, что это все слишком усложняет?


Не делайте подписки!

Я серьезно.

Не усложняйте себе жизнь тем, чтобы заботиться о своих подписках.

Просто перестаньте подписываться!

Делаем подписку без подписки

Если вы еще не читали мою предыдущую статью, то самое время это сделать. Это поможет вам быстро понять контекст происходящего.

Мы рассмотрели самый простой кейс http запроса, когда просто получали данные. Но что делать, если необходимо дождаться завершения предыдущего запроса? Или заменить его новым? Или выполнить все параллельно и обработать результаты разом?

Для этого RxJS предоставляет нам удобные Higher Order Observables операторы. И вот некоторые из них, которые чаще всего встречаются в разработке приложений: switchMap, mergeMap, concatMap, exhaustMap. Давайте поговорим по каждому из них в отдельности и посмотрим для чего они нужны, и когда их можно применить.

И начнем, пожалуй, с самого простого оператора. Мы будем делать запросы к нашему воображаемому API сервису. И для этого напишем простую функцию load(), которая будет эмулировать запрос.

function load(): Observable<number[]> {
return of([ 1, 2, 3 ]).pipe(
delay(1000)
);
}
https://rxviz.com/v/58GYqA4O

Теперь воспользуемся функцией загрузки данных в эффекте $load. Для этого нам понадобится switchMap().

switchMap()

switchMap() переключается на новый поток каждый раз, когда в него приходит сообщение. Если в момент нового сообщения он уже работает с потоком, то он завершает старый поток.

load$: Observable<Action> =
actions$.pipe(
ofType('load'),
switchMap(() => load())
);
https://rxviz.com/v/MogdZRqO

Отлично! Мы создали нашу первую подписку без подписки! Но что если мы хотим добавить эвентовую модель в наше приложение?


Обработка эвентов

Самые популярные эвенты, которые встречаются в приложениях, это Create, Update, Delete. Давайте реализуем простую функцию, которая будет эмулировать наш канал с эвентами:

events: Event[] = [
{ type: 'create', entity: 4 },
{ type: 'update', entity: 2, update: 5 },
{ type: 'delete', entity: 1 }
];
events$: Observable<Event> = from(events).pipe(
concatMap(pipe(of, delay(1000))),
share()
);
function channel(type: string): Observable<Event> {
return events$.pipe(
filter(event => event.type === type)
);
}

И вот что мы сделали: задали тестовый набор эвентов, который будет присылать канал, и написали функцию, которая берет из канала эвенты определенного типа. Теперь напишем эффект, который будет делать подписку на эвенты и назовем его channel$. Для этого воспользуемся оператором mergeMap().

mergeMap()

mergeMap() объединяет все сообщения нового потока с потоками сообщений, с которыми уже работает. И используется тогда, когда нам неважен порядок сообщений, но важно получить сообщения со всех потоков.

channel$: Observable<Action> =
actions$.pipe(
ofType('subscribe event'),
mergeMap((action) => channel(action.payload))
);

Мы начали получать эвенты из канала! Но теперь у нас есть одна проблема, мы не можем отписаться от эвентов, когда они больше не требуются. Например, если мы больше не захотим получать эвенты Create, то у нас не получится этого сделать. Давайте это исправим.

В данной ситуации очень хорошо подойдет оператор takeUntil(). Мы просто вклинимся в поток с командами и будем ожидать, когда приложение захочет отписаться.

function ofPayload(payload) {
return filter((action) => payload === action.payload);
}
channel$: Observable<Action> =
actions$.pipe(
ofType('subscribe event'),
mergeMap((action) => {
const unsubscribe$ = actions$.pipe(
ofType('unsubscribe event'),
ofPayload(action.payload)
);
      return channel(action.payload).pipe(
takeUntil(unsubscribe$)
);
})
);

Теперь мы можем не только подписываться на определенные события, но и отписываться от них! Наше приложение умеет работать с загрузкой данных, но представим, что бекенд не способен обрабатывать большое количество конкретных запросов.


Обработка очередей

Ранее мы уже делали функцию load(), которая эмулирует загрузку данных по сети. И так же реализовали для нее эффект $load, которая позволяет стору загружать данные.

И только что бекендеры сообщили, что больше не могут обрабатывать загрузку данных достаточно быстро, и большое количество запросов сильно нагружают бекенд. Давайте это исправим! Для этого воспользуемся оператором concatMap().

concatMap()

concatMap() объединяет все сообщения нового потока с сообщениями потоков, с которыми уже работает. И перед запуском нового потока дожидается завершения предыдущего. Используется тогда, когда нам важен порядок обработки сообщений, и важно все эти сообщения получить.

load$: Observable<Action> =
actions$.pipe(
ofType('load'),
concatMap(() => load())
);
https://rxviz.com/v/dJPMKZY8

Отлично! Бекендеры счастливы, фронтенд не отправляет все запросы сразу, но есть одно “НО”. Не надо так делать в реальном приложении, пример сильно надуманный. :) По крайней мере хорошо бы отменять предыдущие эвенты, кроме первого и последнего.

Кнопки! Совсем забыли про кнопки!


Обработка с потерей потоков

Один из самых ключевых кейсов нашего приложения будет правильная обработка взаимодействия с данными. Например, если у нас есть какая-нибудь форма или кнопка изменения сущности, то мы не заходим дважды отправлять один и тот же запрос на сервер, так как это может создать различные ошибки.

Давайте создадим простой метод, который будет эмулировать удаление сущности, а если она уже была ранее удалена, то возвращать ошибку:

const removedItems = new Set();
function remove(item: number): Observable<Response> {
const removed = !removedItems.has(item);
removedItems.add(item);

const result = removed ?
of(removed) :
throwError('Already removed');
  return timer(1000).pipe(switchMapTo(result));
}

И для начала попробуем воспользоваться оператором switchMap().

const remove$ =
actions$.pipe(
ofType('remove'),
switchMap((action) =>
remove(action.payload)
.pipe(
catchError((error) => of(error))
)
)

);
https://rxviz.com/v/0oqM1d1o

Как вы можете заметить, мы получаем ошибку при отправке двух одинаковых запросов на удаление. Потому что второй запрос отменяет первый. Давайте это исправим, и нам поможет оператор exhaustMap().

exhaustMap()

exhaustMap() обрабатывает новый поток только тогда, когда он не занят другим потоком. То есть если в момент нового сообщения он уже работает с потоком, то новый поток будет проигнорирован и никогда не обработается.

const remove$ =
actions$.pipe(
ofType('remove'),
exhaustMap((action) =>
remove(action.payload).pipe(
catchError((error) => of(error))
)
)
);
https://rxviz.com/v/EOe2RXjJ

Теперь мы делаем удаление сущности только один раз и больше не получаем никаких ошибок!


Подведем итоги?

Сегодня мы поговорили о подписках и отписках, немного затронули тему Higher Order Observables, и научили наш стор работать с эвентами, и пропускать однотипные действия, которые могут приводить к различным ошибкам. Какой вывод можно сделать из этого всего?

Не делайте подписки! Делайте их только тогда, когда это действительно требуется.

А если сделали, то не забудьте отписаться, и неважно Http запрос это или нет.

В большинстве ситуациях используйте Higher Order Observables. А конечную подписку делайте с помощью AsyncPipe в шаблоне компонента. Это позволит облегчить код, вам не придется задумываться об отписках, и позволит создавать сложные решения на RxJS в декларативном стиле, и более понятные для чтения.

Так же обратите внимание, что у каждого примера в описании скриншота есть ссылка на rxviz.com, где вы можете провести собственные эксперименты.


Ах, да, самое главное. takeUntil() должен быть в конце:

// Wrong!
pipe(
takeUntil(read),
switchMapTo(story)
);
// Ok!
pipe(
switchMapTo(story),
takeUntil(read)
);