RxJs로 비동기 작업을 나누어서 처리하기

hyeonseok Ahn
Pagecall Engineering
7 min readJan 17, 2020

문제 상황 인식하기

시간이 얼마나 걸릴지 예측이 안 되는 작업들을 연달아 처리해야 하는 경우가 있다.

예를 들어보자. PageCall 에서 대량의 이미지와 PDF를 처리하는 경우, 간혹 여기에서 자원을 많이 소모하여 병목 현상이 발생한 적이 있었다.

이 때의 상황은 다음과 같았다.

배열에 저장된 연속된 이미지가 있다. 이 이미지들을 PageCall 에서 사용할 수 있도록 가공하여 업로드를 한다. 이 때 이미지를 가공하는 작업은 비동기적으로 진행한다.

from 과 mergeMap 이용하기

이제부터는 좀 더 쉬운 예시를 위해 배열에 이미지 대신 숫자를 넣는다.

from 은 배열을 다루기 위한 RxJs의 연산자인데,

배열을 인자로 전달하면 각 배열의 배열 요소들의 값을 발행한다.

예시의 mergeMap 은 observable stream 내부에서 observable 을 다루거나 예시처럼 promise 를 다룰 수 있다. mergeMap 연산자의 첫번재 인자로 전달한 함수에서 promise를 return 하면 mergeMap 은 promise가 resolve한 값을 다음 observable stream 으로 그 값(resolve 된 값)을 발행한다.

하지만 이 방법을 쓰는 경우, imgArray 의 각 요소는 순차적으로 함수를 실행하더라도 함수 자체는 각각 비동기적으로 작업하기 때문에 연산이 중첩하여 일어날 수 있다.

concatMap 으로 순차적으로 실행하기

중첩 연산이라도 막아보기 위해 앞서 실행한 비동기 작업이 완전히 끝나야 다음 작업을 시작하도록 한다. 이 때 RxJs의 concatMap 연산자가 유용하다.

concatMap은 source observable 로부터 받아온 값을 순차적으로 처리한다.

예시의 경우 concatMap 의 source observable 은 from(this.someArray) 이다. source observable 은 값을 발행하여 1, 2, 3, … 순으로 concatMap에 전달하지만, concatMap 은 첫번째 인자에 있는 함수를 완료하여 다음으로 이어지는 observable stream 으로 값을 전달할 때까지 source observable 에서 발행한 값을 버퍼에 저장한다.

참고로, concatMap이나 mergeMap의 경우 위의 Observable 을 구독하면 값들이 순차적으로 배열값 하나씩 나온다. 다시 배열 형태로 얻고 싶으면 scan연산자를 사용한다. 마지막으로 발행하는 값만 필요하기 때문에 last 연산자와 함께 사용했다.

하지만 이 방법은 앞서 실행된 비동기작업이 완료된 이후에 다음 작업을 시작하기 때문에 시간이 많이 걸린다는 단점이 있다. 예시의 배열 요소는 12개이지만, 만약 1000개의 배열 요소를 가지고 있다면? 1개의 이미지를 연산할 때 평균적으로 1초가 걸려도 약 1000초 뒤에 작업을 완료한다. 이래서야 자원을 아껴도 의미가 없다. 더 좋은 방법이 없을까?

나누어서 정복하기

일단 수확이 없는 것은 아니다. concatMap이라는 것을 알았으니까.

그렇다면 원하는 만큼만 떼어내어 딱 떼어낸 만큼은 순차적으로 작업을 진행할 수 없을까?

즉, 정해둔 기준치까지는 중첩 연산을 하더라도 그 기준을 넘어서면 작업을 더 진행하지 않는 것이다.

일단은 나누자

from 연산자를 통해 낱개로 나오기에 나눈다는 개념이 와닿지 않을 수 있다.

먼저 기준치를 정하자. 이번 예시에서 배열은 12개의 요소를 가진다. 그럼 3개씩 나누어 보자. 여기서는 bufferCount 라는 RxJs의 연산자를 사용한다.
참고로, 만약 5개로 나눈다고 하면 뒤의 2개는 bufferCount 의 기준을 채우지 못해 영영 못 나온다고 생각할 수 있는데 source observable 의 값 발행이 완료되기 때문에 bufferCount 에서도 더 이상 버퍼에 저장하지 않는다.

bufferCount 는 첫번째 인자로 버퍼에 저장할 개수(個數)를 받는다. 그리고 source observable 이 발행하는 값을 버퍼에 저장하다가, 첫번째 인자의 수만큼 값들이 모이면 값들을 배열 형태로 다음 observable stream 에 발행한다.

이제 조각이 모이는 기분이 든다. RxJs 의 묘미는 이렇게 각각의 observable stream 을 조합하여 목적에 맞는 pipeline 을 만드는 것이 아닐까.

bufferCountconcatMap 을 조합해서 사용하자.

여기서는 일부러 scanlast 를 생략했다. 값들이 3개씩 묶어 나오는 것을 확인하기 위해서이다.

나눈 것은 다시 mergeMap으로 작업하자.

앞서서 frommergeMap을 사용하여 비동기처리를 했던 것이 기억나는가? 마침 bufferCount 를 통해 dividedArray 배열을 받았으니 of 가 아니라 frommergeMap 으로 아까처럼 작업을 해보자

이 작업을 통해 달성한 것은 다음과 같다.

  1. 비동기 작업들을 기준값으로 나누고
  2. 나누어진 작업들은 내부에서 중첩 가능한 상태로 비동기 작업을 수행하지만
  3. 기준치 단위로 ‘묶은 작업’들은 순차적으로 진행한다.

하나의 배열로 모으기

앞서 예시는 좀 더 분명하게 bufferCount 의 느낌을 전달하기 위해 3개씩 묶은 배열을 차례대로 보여주었는데 하나의 배열로 결과물을 얻기 위해서는 다음과 같이 한다. scanlast 연산자의 위치에 주목한다.

pipe 안 pipe

위의 코드는 작동하는 데에는 문제가 없지만, 코드를 읽는 입장에서는 약간 곤혹스럽다. concatMap 내부의 from 에서 pipe 를 하나 더 만들었기 때문이다.

이런 nested observable stream 은 코드도 길어지고 가독성도 떨어진다.

뿐만 아니라, concatMap 이 순차적으로 관리하는 것은 bufferCount 로 나눈 단위라는 것을 확실히 인지시키기 위해서라도 이 nested observable stream은 별개의 함수로 나누는 것이 좋다.

그래서 controlChunk 라는 함수(예시에서는 메소드)로 만들어서 관리하려고 한다.

순서 보장하기

사실 여기까지 작업했으면 비동기 작업을 RxJs를 통해 기준치 단위로 나누어서 처리하는 것은 완료했다.

하지만 PageCall 의 이미지 처리에서는, 이렇게 해서 얻은 가공된 배열의 순서가 처음 배열의 순서와 동일, 즉 첫번째 이미지와 가공한 첫번째 이미지의 해쉬값이 동일하고 두번째 이미지와 가공한 두번째 이미지의 해쉬값이 동일하다는 점이 보장이 되어야만 했다.

위의 예시들은 순서가 그대로 나오지 않는다.

여기서 몇가지를 고민해볼 수 있다.

controlChunk 에서도 concatMap 을 사용하기? 이래서야 그냥 처음부터 concatMap 으로 처리하는 것과 다를 바 없다.

그렇다면 원시 타입인 숫자값에 메타 정보를 입혀서 연산이 끝난 뒤에 그 메타 정보를 토대로 배열을 정렬하는 것은? 추가 작업(메타 정보 입력 등)이 번거롭고 자칫하다간 부수 효과를 초래할 수 있다.

해결법은 간단한데, forkJoin 함수(연산자가 아니다)를 사용한다.

scan 연산자에서 spread operator 사용이 조금 바뀌었고, mergeMap 대신에 promise 를 처리하기 위하여 from 연산자와 javascript의 map 을 사용하였다.

마치며

비동기처리는 고려해야 하는 부분도 많고, 추상화하기에는 복잡한 개념이다.

하지만 PageCall 에서 사용중인 RxJS를 사용하여 비동기처리 작업을 수행하는 경우, Observable stream 이라는 개념에만 익숙해지면 높은 수준의 추상적 개념으로 비동기 처리를 다룰 수 있을 것이다.

--

--