for await … of 를 사용하여 stream을 synchronous 하게 처리하는 방법
Published in
5 min readFeb 12, 2020
TL;DR
Node.js 환경에서 backpressure 가 일어나는 경우 for await ... of
를 사용하면 좋습니다.
Backpressure
backpressure는 생산(produce)하는 것보다 소비(consume)하는 것이 느릴 때 문제가 생기는데요.
간단한 예로 일반 텍스트 파일을 읽어서(생산) 파싱하고, db에서 데이터를 읽고, 저장하고, 또 다른 무거운 일을 할 때(소비) 발생합니다.
이를 처리하기 위해서 RxJS 에서는 debounce
, sampling
, throttle
등 다양한 lossy 처리 방법을 지원하고, loss-less 방법으로는 pause
, resume
등을 지원 하고있습니다. 하지만 RxJS 로 처리하기에는 복잡하고, 번거로울 때가 있는데 이 때 for await ... of
를 사용 하면 간단하게 사용 할 수 있습니다.
RxJS로 처리하는 경우
무거운 작업을 할 때 시스템에 부하가 많이 가고, 실제로 finish
가 출력 되는 시간은 모든 line을 처리한 시간이 아니고 생산(produce)이 끝난 시간입니다.
function fsReadLineStream(path) {
return Observable.create((observer) => {
const rl$ = readline.createInterface({
input: fs.createReadStream(path),
})
rl$.on('line', line => {
observer.next(line.trim())
})
rl$.on('end', () => observer.complete())
rl$.on('close', () => observer.complete())
rl$.on('error', (error) => observer.error(error))
return () => rl$.pause()
})
}
function run() {
const file = '/Users/user/Downloads/largeTextFile.txt'
const stream = fsReadLineStream(file)
stream.pipe(
map(convertTo),
).subscribe({
next: async data => {
await insertMany(data) // 무거운 작업
},
complete: () => {
console.log('finish', new Date())
}
})
}
for await … of
를 사용하는 경우
RxJS와는 다르게 무거운 작업인 insertMany
가 동시에 실행되지 않고 순차적으로 실행됩니다. 또한 finish
는 모든 line을 처리한 후에 출력됩니다.
async function run() {
const file = '/Users/user/Downloads/largeTextFile.txt'
const fileStream = fs.createReadStream(file)
const stream = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
})
for await (const line of stream) {
const data = convertTo(line)
await insertMany(data) // 무거운 작업
}
console.log('finish', new Date())
}
사내에서 mongoose 를 사용하고 있는데 mongoose 에도 적용 할 수 있습니다.
const stream = Collection.aggregate([{
$match: {
lastModifiedDateTime: {
$gte: startDateTime,
$lt: endDateTime
}
}
}]).cursor({ batchSize: 1000 })
for await (const doc of stream) {
// 무거운 작업
}