Data migration from MongoDB Atlas to BigQuery using Airbyte and handling the Raw Data(JSON) using Custom Queries

Piyush Bajaj
Google Cloud - Community
8 min readAug 5, 2022

Data migration is the process of moving data from one location to another, one format to another, or one application to another . In this blog we will see how we can migrate the data from MongoDB, which is a NoSQL database, to BigQuery.

BigQuery offers features like machine learning, geostatic analysis, and business intelligence to help you manage and analyze your data . BigQuery offers a serverless architecture , which lets you use SQL queries to answer your organization’s questions.

We will be using Airbyte to migrate our data from MongoDB to BigQuery.

Introduction to Airbyte:

Airbyte is an open-source data pipeline platform that serves as an alternative to Stitch Data and Fivetran. Although these existing data pipeline platforms offer a significant number of integrations with well-regarded sources like Stripe and Salesforce, there is a gap in the current model that leaves out small service integrations.

Why we should choose airbyte ?

  • Offers easy deployment of free Community Edition.
  • Have more than 150 sources and All data warehouses, lakes and databases as destinations .
  • Connectors on Airbyte run in Docker containers, which allows for independent operation. You’ll simply monitor every one of your connectors, refresh them as needed, and schedule updates.
  • Airbyte gives you a functionality to monitor every one of your connectors, refresh them as needed, and schedule updates.

Sync modes​

  1. Full Refresh Overwrite: Sync the whole stream and replace data in the destination by overwriting it.
  2. Full Refresh Append: Sync the whole stream and append data to the destination.
  3. Incremental Append: Sync new records from the stream and append data to the destination.
  4. Incremental Deduped History: Sync new records from stream and append data in destination, also provides a de-duplicated view mirroring the state of the stream in the source.

Normalization and Transformation

  1. Raw Data(JSON): Airbyte only pushes the data into the raw tables that are in the “JSON format”.
  2. Normalized tabular data: Airbyte pushes data into the raw tables and then uses its own queries to push the data first to the temporary table(in normalized tabular format) and then uses merge scripts to push the data into the final table.

Business Use Case:

MongoDB is a well-known NoSQL database that requires data to be modeled in “JSON format”. If the data model of your application is a natural fit for MongoDB’s recommended data model, it can provide good performance, flexibility, and scalability for transactional workloads.

However, when it comes to analyzing knowledge, MongoDB is not a good solution — it lacks proper join, transferring data from other systems to MongoDB is difficult, and it lacks native SQL support. It’s not as easy to write complex analytics logic in MongoDB’s aggregation framework as it is in SQL.

As a result, there are strong use cases to stream data from MongoDB to BigQuery and then use BigQuery for additional analytical and ML scenarios..

Why should we use custom queries when Airbyte itself can handle everything?

The Incremental Append/Incremental Dedup mode is often chosen as the sync method for most business use cases, and data is typically pushed straight to the main table using the Normalized tabular data option from Normalization.

Similar to many ETL/ELT tools, we have no control over the merge queries that Airbyte is building in the backend, so if we use the same queries, we can end up paying a lot to BigQuery.

In this blog, we’ll look at how to push only the raw data while choosing Raw Data (JSON) as the normalization option using Airbyte’s Incremental Append/Incremental Dedup mode. Custom queries will be used to handle everything, which will reduce BigQuery’s costs .

Figure 1. Raw Data(JSON) Sample

Just to add, when using the normalization feature, Airbyte generates approximately seven tables for a single final table, of which only three are required. The rest were created by Airbyte for internal purposes. Airbyte keeps all of the staging, raw, and final tables in a single database, which can cause a lot of confusion when we have a large number of tables.

Steps:

  1. Installing Airbyte on the GCP compute engine.

Reference link: https://docs.airbyte.com/deploying-airbyte/on-gcp-compute-engine/

Airbyte UI

Figure 2. Airbyte User-Interface

2. Setting up a Source

  • Click on Source from the left pane and then New Source.
Figure 3. Setting up the source
  • Fill in all the details and click on Setup source.
Figure 4. Setting up the source
Figure 5. Setting up the source

3. Setting up a destination

  • When setting up the destination, just click on the destination on the left pane and then choose New destination in the top right corner.
Figure 6. Setting up the destination
  • Fill in all the details.
Figure 7. Setting up the destination
Figure 8. Setting up the destination
Figure 9. Setting up the destination

4. Setting up connections.

  • Click on Connections from the left pane and then click on New Connections in the top right corner.
Figure 10. Setting up the connection
  • Choose your Source and click on use existing one and same for the Destination.
Figure 11. Setting up the connection
  • Choose sync mode, cursor field, and primary key.

Cursor Field: A cursor field is the field that is used to get the incremental data from the MongoDB atlas server.

Figure 12. Setting up the connection
  • Last in the normalization and transformation section: We are using Raw Data(JSON) mode for our use case.
Figure 13. Setting up a connection (Normalization method).

5. Click on the Setup connection.

Figure 14. Setting up the connection
  • Now you have the raw data in BigQuery:

_airbyte_ab_id , _airbyte_emitted_at are the columns which are added by airbyte at the time of pushing our data to BigQuery, and _airbyte_data is the actual data that we need.

Figure 15. Raw Table

Custom Normalizations and Transformations:

Since Airbyte may push data to a raw table, we will write our own script to push the raw data to a normalized table first and then, at a later stage, to a main table that contains all the merged data. To minimize the query scan and lower the cost at the BigQuery end, we will maintain partitioning in the staging and final tables.

Figure 16. Normalization and Transformation
  • Normalizing Data Query:

After normalization, the query below will extract data from the raw table and push it to the staging table. For this step, we’re going to use the json extract() function.

INSERT INTO `Demo.test.A_temp`
SELECT
json_extract(_airbyte_data, "$['_id']") as _id,
json_extract(_airbyte_data, "$['Address']") as Address,
json_extract(_airbyte_data, "$['created_at']") as created_at,
json_extract(_airbyte_data, "$['updated_date']") as updated_date,
json_extract(_airbyte_data, "$['user_id']") as user_id,
FROM
`Demo.test._airbyte_raw_A`
Figure 17. Staging Table
  • Sample Merge query

The last step is to run a merge query and push the entire data to the final table. Before pushing the data, we can check for duplicate rows, if any, we can usethe row_number() function for de-duplication. Also, to limit the data scan, we can get the min and max timestamp column value from the staging table and can push the same with the ON condition in the merge query. This will help us lower the scanning cost as we will scan only the required portion of the final table.

MERGE <target_table> [AS TARGET]
USING <table_source> [AS SOURCE]
ON <search_condition>
[WHEN MATCHED
THEN <merge_matched> ]
[WHEN NOT MATCHED [BY TARGET]
THEN <merge_not_matched> ]
[WHEN NOT MATCHED BY SOURCE
THEN <merge_matched> ];
Figure 18. Final Table

Setting up an incremental pipeline:

  • This method is divided into two parts.
  1. Syncing data from MongoDB to BigQuery:
    This is handled by Airbyte. Airbyte only retrieves new data based on the cursor field defined in Airbyte’s UI.
  2. Writing your own queries to sync data from raw tables to the final tables:
    As previously discussed, we can write our own queries and schedule them in BigQuery using the scheduled queries option to sync all data from the raw table to the final table.

In BigQuery, we can set up the incremental load using the metatable concept.

A metatable is simply a table that holds the metadata of other tables. We use the metatable to store the final table’s maximum “updated_at”. This metadata information is used by normalization queries to return only values with an “updated_at” value greater than the information stored in the metatable.

Figure 19. Normalization and Transformation

A sample metatable update query:

UPDATE `test.metatable` SET max_updated_at = ( SELECT MAX(updated_date) FROM `Demo.test.final_A`) WHERE table_name='final_A'
Figure 20. Metatable

Challenges identified while using Airbyte :

  • When a new column or table is added, the entire dataset must be resynced.
  • Airbyte does not support deletion.
  • Airbyte is unable to handle complex data in the form of a record ,example if you have an address columns which is of the form :{“Name”:”jack”,”phno”:xxxxxxx,“Address”:{“full name”:”jack mathew”,”address line 1":”northern street”,”pincode”:123432}} in this case Airbyte will make the address column NULL
  • There is no way to skip historical load and only run incremental load.

Conclusion:
In this Blog, we have explored how to Migrate Data from MongoDB Atlas to BigQuery using Airbyte and handling the Airbyte raw table data using custom queries

Thank you so much for reading this blog, Happy deployments☁️😄.

I would like to give special thanks Priyanka Raikwar, Parth Gangrade, Tanmay Sakpal , Shreya Goel for your contributions and motivating me to write this blog :)

--

--