Working on Bulk Data with MongoDB, Node.js & Streams!
Recently, I had to query a really big data set in MongoDB ~50 Mn documents and with around ~0.5 Mn in response being handled and exported to csv
file in the required format.
Note: If you are in hurry, the complete solution is attached in the end, otherwise, I will recommend going through it.
For a better understanding, we will continue with an example where we will be needing to populate the empty data columns from the database.
So, I am here to make it easier for you, if you ever run into something similar. I will split the solution into smaller parts:
- Get Query Inputs from file .xlsx (optional)
- Setup Database connection ready to be fired 🔥
- Custom Stream to transform the data (optional)
- CSV Stringify stream: to generate csv friendly data (optional)
- Export data to output (process.stdout or .csv file)
That’s enough, time to code!
Input Query data from file for Query
To read data from the .xlsx file, I will be using xlsx
package. Sample input file with some fields to be populated:
Here, we need to populate Name, Email from User collection in the database. Firstly, we collect input from the sheet (userId) in an array and query database.
Now, we have the parameters for the Query. Let’s proceed to the next step.
Setup Database connection
It’s time to connect with the database, we will use the connectDB
function to initiate a connection and invoke this in the main
function (where the magic happens).
If you observed carefully, we didn’t handle database query response with a promise but instead used a cursor, which will be our ReadableStream.
Using the stream, we pipe the data flow across as many streams as required, in this case, a custom transform stream, csv-stringify stream, and output stream.
Custom Stream for data transformation
This is the most interesting part, we will make a custom stream handler to transform the data using TransformStream.
In this example, we transformed the incoming stream, like we combined firstName, lastName to form Name.
After the operation(s), simply pass the data to this.push()
function, but while working on the pipe with your own custom streams, keep in check, which data type you are passing to the next stream and if that is supported by the next stream in the pipe?
“Some Streams only accepts restricted data types like: Buffer, String only.”
CSV Stringify stream
Now, we will move forward with another stream, which transforms the data into a csv friendly format, and don’t worry, we won’t have to implement it, a package provides this out of the box csv-stringify
😁
Stringifier returns ReadableStream which we will be further piped into the output stream (stdout, file).
Export data to output stream
At last, we will export the data, chunk by chunk in the desired output stream. The simplest approach for this would be using process.stdout
stream as: stream = stream.pipe(process.stdout)
With this, the whole data will be streamed on the stdout (terminal), but if we want to store the data in a file, here is how:
Conclusion
Now, that we have through all the process, it’s time for the output 👀
This is the expected output file following all the steps with the data populated in the other columns.
That’s all! 🎉 the link to the complete file is:
Hope you liked this article, clap and follow for more.