Практическое применение RxJS

Написание собственного Ngrx

🦊 Reactive Fox 🚀
Angular Soviet
10 min readNov 2, 2018

--

Стейт-менеджер — это не обязательно redux, его можно создать любыми доступными способами. Все популярные стейт-менеджеры для Angular заставляют определять бизнес логику в DI, глобально или в Singleton. Я призываю вас задуматься о выборе различных решений для своего приложения.

Управление состоянием

К написанию данной статьи меня сподвигло повсеместное неправильное использование RxJS, связанное с незнанием операторов, непониманием принципов работы библиотеки или основ построения реактивного и декларативного кода. В этой статье я рассмотрю часто встречающиеся кейсы и попутно вместе с вами реализуем собственный Ngrx на RxJS. Также я предполагаю, что вы уже частично знакомы с обеими библиотеками.

Какие задачи ставятся перед собственной реализацией

Для начала определимся с целями, которые мы ставим перед нашей библиотекой. Она должна обладать следующими характеристиками:
хранение текущего состояния;
изменение состояния;
обработка различных действий;
выполнение асинхронных запросов;
правильная обработка ошибок;
и самое главное, очищать состояние, когда оно больше не требуется.

Состояние Состояния

Давайте определимся с тем, что будет храниться в состоянии нашего примера. Это будет простой список чисел и индикатор: загружаются сейчас данные или нет. Интерфейс будет выглядеть следующим образом.

interface ItemsState {
items: number[];
loading: boolean;
}

И создадим состояние по умолчанию.

const defaultState: ItemsState = {
items: [],
loading: false
};

of()

Теперь, чтобы работать с состоянием как с потоком RxJS, обернем его в оператор of() для создания Observable.

of() создает поток с одним или несколькими элементами, который завершается сразу после их отправки.

state$: Observable<ItemsState> = of(defaultState);
https://rxviz.com/v/XJzKNLX8

Как видно из диаграммы, Observable возвращает состояние по умолчанию и завершается. Давайте сделаем так, чтобы он никогда не завершался.

NEVER, startWith()

Для удержания потока можно применить Subject, но пока что воспользуемся константой NEVER. Забегая вперед скажу, что мы еще вернемся к использованию Subject, а пока сосредоточимся над удержанием потока состояния.

NEVER является простым потоком RxJS, который никогда не завершится.

startWith() указывает потоку, какое должно быть первое значение. Вместе с NEVER он может заменить оператор of().

state$: Observable<ItemsState> =
NEVER.pipe(
startWith(defaultState)
);
https://rxviz.com/v/xOvKQRpJ

Обратите внимание, что теперь поток не завершается. Но каждый новый подписчик будет работать с разными потоками, а значит и с разными данными. Давайте решим эту проблему.

publishReplay(), refCount()

Для удержания потока с состоянием обычно применяют BehaviorSubject. В нашем примере более правильным решением будет применить два оператора — publishReplay() и refCount().

publishReplay() создает внутри себя буфер сообщений и первым параметром принимает размер буфера. Каждый новый подписчик сразу же получит накопленные сообщения. Для наших целей требуется хранить только последнее значение, поэтому размер буфера укажем равным одному.

refCount() реализует простой паттерн Ref Count, который используется для определения актуальности текущего потока. Если слушателей больше не останется, то он отпишется от потока, тем самым уничтожив его вместе с данными.

Обратите внимание! Если все подписчики отпишутся от Observable с refCount(), то Observable завершится. Но как только новый подписчик подпишется на Observable, то Observable запустится снова как новый.

state$: Observable<ItemsState> =
NEVER.pipe(
startWith(defaultState),
publishReplay(1),
refCount()

);
https://rxviz.com/v/58GYqgvO

Таким образом мы можем быть уверены в том, что все подписчики состояния будут работать с одним и тем же потоком, и одним и тем же набором данных.

Обратите внимание на минус текущего решения: пока что мы никак не можем управлять потоком состояния.

Поток управления изменяющий состояние

Для начала определимся, что мы хотим получить для управления нашим состоянием. Один из вариантов, это создавать и обрабатывать различные команды, с которыми будет взаимодействовать поток. Определим для них интерфейс.

interface Action {
type: string;
payload?: any;
}

В поле type будет указано название команды, а в поле payload данные, необходимые для выполнения этой команды.

Subject

Для реализации потока с командами очень хорошо подойдет Subject, о котором я упомянул в самом начале. Он создаст активный поток, который умеет не только слушать, но и отправлять данные.

С помощью Subject создадим поток с командами и назовем его actions$.

actions$: Subject<Action> = new Subject<Action>();
https://rxviz.com/v/qJyAK9aJ

Сейчас мы сделали поток управления. Теперь, давайте свяжем его с потоком состояния, заменив NEVER на actions$.

actions$: Subject<Action> = new Subject<Action>();state$: Observable<ItemsState> =
actions$.pipe(
startWith(defaultState),
publishReplay(1),
refCount()
);

Теперь у нас есть два потока: поток состояния и поток управления, которые умеют взаимодействовать друг с другом. Но пока что наше состояние просто перезаписывается новыми командами. Давайте это исправим.

Обработка команд

Для начала определим, что необходимо для обработки команд: получать из потока состояние и команду, изменять и возвращать новое состояние. Для этих целей подойдет оператор scan().

scan()

scan() получает на вход функцию reducer, в которую будет приходить текущее состояние и новое значение из потока (команда).

Опишем функцию и воспользуемся оператором scan(), чтобы состояние реагировало на команды.

function stateReducer(
state: ItemsState,
action: Action
): ItemsState => {
switch (action.type) {
default:
return state;
}
}
state$: Observable<ItemsState> =
actions$.pipe(
startWith(defaultState),
scan(stateReducer),
publishReplay(1),
refCount()
);

Теперь поток не теряет состояние, но и никак не реагирует на команды. Добавим обработку команд load и load success.

function stateReducer(
state: ItemsState,
action: Action
): ItemsState => {
switch (action.type) {
case 'load':
return { ...state, loading: true };

case 'load success':
return { ...state, loading: false };

default:
return state;
}
}
state$: Observable<ItemsState> =
actions$.pipe(
startWith(defaultState),
scan(stateReducer),
publishReplay(1),
refCount()
);

Когда приходит команда load, то состояние меняется на loading: true. А когда load success, то — loading: false. Но пока что загрузки данных не присходит.

Обработка эффектов

Наше состояние умеет реагировать на синхронные команды. Что же делать, если команды асинхронные? Нам необходим поток, который на вход будет получать одну команду, а на выход будет возвращать другую команду. Значит так и запишем.

load$: Observable<Action> = actions$;

filter()

Сначала нам надо убедиться, что пришедшая команда имеет тип load. Для этого воспользуемся оператором filter().

filter() решает пропускать ли значение дальше или нет.

load$: Observable<Action> =
actions$.pipe(
filter((action) => 'load' === action.type)
);

Для того, чтобы делать код читаемее, принято создавать собственные операторы для RxJS. В данном случае нам необходим оператор, который на вход будет принимать тип команды и фильтровать команды по этому типу.

function ofType<T extends Action>(
type: string
): MonoTypeOperatorFunction<T> {
return filter((action) => type === action.type);
}
load$: Observable<Action> =
actions$.pipe(
ofType('load')
);
https://rxviz.com/v/moY1ZEKo

Теперь, когда у нас есть отдельный поток, получающий команды определенного типа, мы можем запустить в нем загрузку данных. Для простоты будем использовать функцию, эмулирующую загрузку, а для задержки сети воспользуемся оператором delay().

delay()

delay() устанавливает задержку для потока, в нашем случае это 1 секунда.

function load(): Observable<number[]> {
return of([ 1, 2, 3 ]).pipe(
delay(1000)
);
}
https://rxviz.com/v/58GYqA4O

Теперь воспользуемся функцией для загрузки данных внутри эффекта load$. Для этого нам понадобится оператор switchMap().

switchMap()

switchMap() создает новый поток каждый раз, когда в него приходит сообщение. Если в момент нового сообщения он уже работает с потоком, то он завершает старый поток.

load$: Observable<Action> =
actions$.pipe(
ofType('load'),
switchMap(() => load())
);
https://rxviz.com/v/7JXXaK6J

Теперь когда поток load$ возвращает данные из функции load(), обернем их команду load success, а данные положим в payload. Для этого воспользуемся оператором map().

map()

map() получает данные из потока, изменяет и возвращает их обратно в поток.

load$: Observable<Action> =
actions$.pipe(
ofType('load'),
switchMap(() => load()),
map((data): Action => ({
type: 'load success',
payload: data
}))

);
https://rxviz.com/v/RoQ7y2qJ

Теперь у нас есть эффект, который получает команду, загружает данные и возвращает их в нужном нам виде.

Объединяем все вместе

Прежде чем подступить к реализации обработки команды load success, необходимо внести некоторые изменения. Нам необходимо отвязать прямую зависимость между state$ и actions$. Сделаем новый поток и назовем его dispatcher$. Он будет объединять в себе все сообщения из потока actions$ и эффекта load$. Для этого воспользуемся оператором merge().

merge()

merge() берет сообщения из всех указанных потоков и объединяет в один.

dispatcher$: Observable<Action> = merge(actions$, load$);

Теперь соединим все воедино: у state$ заменим поток с actions$ на dispatcher$, а так же обновим reducer и посмотрим на результат.

function stateReducer(state, action) {
switch (action.type) {
// ...
case 'load success':
return {
...state,
items: action.payload,
loading: false
};
// ...
}
}
state$: Observable<ItemsState> =
dispatcher$.pipe(
startWith(defaultState),
scan(stateReducer),
publishReplay(1),
refCount()
);

Обработка ошибок

И еще один важный момент — это правильная обработка ошибок. Давайте сделаем запрос, который будет постоянно возвращать ошибку. Для этого создадим новую функцию loadWithError(), которая будет эмулировать ошибку при загрузке с такой же задержкой в 1 секунду.

timer()

timer() запускает выполнение потока через указанное время, в нашем случае через 1 секунду.

switchMapTo()

switchMapTo() делает переключение на переданный поток, в нашем случае просто возвращаем поток с ошибкой

throwError()

throwError() создает поток с ошибкой.

function loadWithError() {
return timer(1000).pipe(
switchMapTo(throwError('Something wrong!'))
);
}

Подключим ее в наш эффект load$, а для обработки ошибок применим оператор catchError().

catchError()

catchError() срабатывает в том случае, если поток завершается с ошибкой и позволяет ее обработать.

/**
* Неправильный код (!)
**/
const load$ =
actions$.pipe(
ofType('load'),
switchMap(() => loadWithError()),
map((data) => ({
type: 'load success',
payload: data
})),
catchError((error) => of({
type: 'load failed',
payload: error
}))

);
/**
* Неправильный код (!)
**/

И обработаем полученную команду с ошибкой в нашем stateReducer(). Обратите внимание, что при инициализации загрузки мы затираем ошибку в состоянии.

function stateReducer(state, action) {
switch (action.type) {
case 'load':
return {
...state,
error: null,
loading: true
};
// ...
case 'load failed':
return {
...state,
error: action.payload,
loading: false
};

// ...
}
}

Как вы можете заметить, то эффект срабатывает всего лишь один раз, хотя отправляется три команды на загрузку. Это происходит из-за того, что поток с эффектом load$ завершается и больше не получает команды. Давайте это исправим. Для этого нам необходимо перенести обработку загрузки данных и обработку ошибки под switchMap().

const load$ =
actions$.pipe(
ofType('load'),
switchMap(() =>
loadWithError().pipe(
map((data) => ({
type: 'load success',
payload: data
}))
,
catchError((error) => of({
type: 'load failed',
payload: error
}))

)
)
);

Теперь наши ошибки обрабатываются правильно, а поток с эффектом не завершается после ошибок. Ура!

Итоги

Конечно, реализация в предложенном не подойдет к использованию в реальных приложениях, так как требует серьезных доработок и декомпозиции. Но даже на текущем этапе она дает полную свободу действий, в отличии от других решений.

Настоятельно рекомендую тем, кто только начал знакомство с RxJS, попробовать описанные в статье подходы на практике с применением других операторов, и написать свой оператор, например select().

Так же обратите внимание, что у каждого примера в описании скриншота есть ссылка на rxviz.com, где вы можете провести собственные эксперименты.

И кому хочется потыкать на кнопочки, ниже есть готовый пример на stackblitz.com.

--

--

🦊 Reactive Fox 🚀
Angular Soviet