for await … of 를 사용하여 stream을 synchronous 하게 처리하는 방법

전혁수
네이버 플레이스 개발 블로그
5 min readFeb 12, 2020

TL;DR

Node.js 환경에서 backpressure 가 일어나는 경우 for await ... of 를 사용하면 좋습니다.

Backpressure

backpressure는 생산(produce)하는 것보다 소비(consume)하는 것이 느릴 때 문제가 생기는데요.

간단한 예로 일반 텍스트 파일을 읽어서(생산) 파싱하고, db에서 데이터를 읽고, 저장하고, 또 다른 무거운 일을 할 때(소비) 발생합니다.

이를 처리하기 위해서 RxJS 에서는 debounce, sampling, throttle 등 다양한 lossy 처리 방법을 지원하고, loss-less 방법으로는 pause, resume 등을 지원 하고있습니다. 하지만 RxJS 로 처리하기에는 복잡하고, 번거로울 때가 있는데 이 때 for await ... of를 사용 하면 간단하게 사용 할 수 있습니다.

RxJS로 처리하는 경우

무거운 작업을 할 때 시스템에 부하가 많이 가고, 실제로 finish가 출력 되는 시간은 모든 line을 처리한 시간이 아니고 생산(produce)이 끝난 시간입니다.

function fsReadLineStream(path) {
return Observable.create((observer) => {
const rl$ = readline.createInterface({
input: fs.createReadStream(path),
})

rl$.on('line', line => {
observer.next(line.trim())
})

rl$.on('end', () => observer.complete())
rl$.on('close', () => observer.complete())
rl$.on('error', (error) => observer.error(error))

return () => rl$.pause()
})
}

function run() {
const file = '/Users/user/Downloads/largeTextFile.txt'
const stream = fsReadLineStream(file)

stream.pipe(
map(convertTo),
).subscribe({
next: async data => {
await insertMany(data) // 무거운 작업
},
complete: () => {
console.log('finish', new Date())
}
})
}

for await … of 를 사용하는 경우

RxJS와는 다르게 무거운 작업인 insertMany가 동시에 실행되지 않고 순차적으로 실행됩니다. 또한 finish 는 모든 line을 처리한 후에 출력됩니다.

async function run() {
const file = '/Users/user/Downloads/largeTextFile.txt'
const fileStream = fs.createReadStream(file)
const stream = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
})

for await (const line of stream) {
const data = convertTo(line)
await insertMany(data) // 무거운 작업
}

console.log('finish', new Date())
}

사내에서 mongoose 를 사용하고 있는데 mongoose 에도 적용 할 수 있습니다.

const stream = Collection.aggregate([{
$match: {
lastModifiedDateTime: {
$gte: startDateTime,
$lt: endDateTime
}
}
}]).cursor({ batchSize: 1000 })

for await (const doc of stream) {
// 무거운 작업
}

참고

--

--