How to Efficiently Sync Data from Google Cloud Storage to MongoDB using Airflow Composer: A Step-by-Step Guide

Ranjeet Shailendra Nagarkar
USF-Data Science
Published in
12 min readMar 1, 2024
Fig: GCP Airflow MongoDB Syncing concept

Table of Contents

GCP Bucket Data
Create Airflow Composer Environment
DAG Install Python Libraries
Create DAG File
Upload DAG
Airflow webserver interface
Setup and Connect MongoDB cluster
Connect GCP and MongoDB
Run Airflow DAG
View Airflow logs
View DAG Code on Airflow
Managing your project budget

In this article, we explore the rationale behind synchronizing raw data from a Google Cloud Platform (GCP) bucket with MongoDB, and discuss scenarios where Apache Airflow can be particularly effective for such tasks.

The dataset for this project involves information from a Medium API, a paid service offering a platform for articles and writers. The deliberate choice of a paid service reflects real-world considerations where careful management of API calls and budgeting is crucial. Our primary goal is to develop a recommendation system. This system will recommend articles to users based on either a two-tower recommendation system or based on content similarity to top articles of the writers they are subscribed to. Additionally, we aim to suggest writers to users using a writer-writer content-based recommendation approach. To facilitate this, we are synchronising data from our Google Cloud Platform (GCP) bucket, which holds the raw data, with MongoDB. MongoDB provides a flexible and scalable NoSQL database solution, suitable for handling the dynamic nature of content-related data. In a subsequent article, we will delve into the Exploratory Data Analysis (EDA) conducted on the dataset and the details of the machine learning model employed for the recommendation system. For now, our focus is on the synchronisation tasks and the seamless integration of data between the GCP bucket and MongoDB.

Synchronisation Process: Bridging GCP and MongoDB

In the journey to empower our content-based recommendation system, a pivotal aspect is the synchronisation of data between our Google Cloud Platform (GCP) bucket and MongoDB. This process forms the backbone of our data infrastructure, and its seamless execution plays a crucial role in ensuring efficient data handling.

1. GCP Bucket Data

The medium data resides in a GCP bucket, housing raw information about Medium articles, writers, and user interactions. This centralised repository serves as the foundation for our recommendation engine. To create a bucket in your GCP project, you must go to the console page and select create a storage bucket, which then prompts you to:

a) Name your bucket: Give an appropriate name to your bucket
b) Select a region: Since this is a personal project we choose a any region lets say US-west1(Oregon)
c) Set Default and Standard storage class
d) Choose Prevent public access and Uniform Access control,
keep any other step as default unless you want to put some other restrictions

Now when you go to the Cloud storage page it should look like this with the name of the bucket that you just created:

Fig 1: GCP data bucket creation

2. Create Airflow Composer Environment

Now we must enable the airflow composer, to do so in your search bar, type Cloud Composer and select Enable:

Fig 2: Enable the composer API service for your project

Remember this composer is only for your current project, if you go to another project you will have to enable this API again.

Once we have enabled the API and created a composer environment, you should be on the environments page, you can click create but if you were redirected somewhere else, search environments and then go to ‘Environments’ composer page which should look like this:

Fig 3: Environments page

Then we choose autoscaled that is Composer 2, this will automatically scale the number of workers when the environment is under a heavy load:

Fig 4: Create Airflow composer environment

Choose an appropriate name and version(preferably latest) for your environment. You must also grant your service account all the necessary permissions access. Creating a composer environment takes substantial time. But once the environment is created, you should get a green check status for your environment:

Fig 5: Composer Environment successfully created

3. DAG python libraries installation

To incorporate the GCSHook and MongoHook libraries into your Airflow environment, follow these steps:

  1. Navigate to the environment you created.
  2. Click on “PyPi”, the last option in the environment settings.
  3. Add the necessary packages (gcs-hook and mongo-hook) to your environment.
  4. These libraries enable connectivity and interaction with Google Cloud Storage and MongoDB, respectively.
  5. Once installed, you can utilize these hooks in your Airflow DAGs for seamless integration with GCS and MongoDB operations.
Fig 6: PYPI packages

Installing libraries on your composer environment is analogous to installing libraries on your local python environment. Hence, any library that you have imported into your DAG files must be installed on your GCP composer.

4. Create DAG File

To understand a DAG Python file in a step by step manner lets break it down:

  1. Define/Instantiate the DAG object:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 3, 18),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
}

dag = DAG(
'example_dag',
default_args=default_args,
description='A simple DAG example',
schedule_interval='@daily', # Runs the DAG daily, please set it to None for manual
)

The above code instantiates the DAG object for certain parameters. You may change the argument values according to your needs, these are just the default arguments for a DAG object.

2. Define tasks/operators:

def get_gcs_file_content(**kwargs):
ti = kwargs['ti']
gcs_hook = GCSHook(google_cloud_storage_conn_id)
file_content = gcs_hook.download(bucket_name=bucket_name, object_name=json_file_path)

try:
file_content_list = json.loads(file_content.decode('utf-8'))
except json.JSONDecodeError:
print("Content is not a valid JSON array. Trying to interpret as a string.")
# If decoding as a list fails, try interpreting the content as a string
file_content_str = file_content.decode('utf-8')
# Remove the logging information at the beginning
json_start_index = file_content_str.find("[")
if json_start_index != -1:
file_content_str = file_content_str[json_start_index:]
file_content_list = json.loads(file_content_str)

# Log the content for debugging
print(f"Decoded Content: {file_content_list}")

follower_data = upload_to_mongo_follower_data(file_content_list)
articles_data = upload_to_mongo_articles_data(file_content_list)
writer_data = upload_to_mongo_writer_data(file_content_list)

# Push the processed data to XCom
ti.xcom_push(key='gcs_data_follower', value=follower_data)
ti.xcom_push(key='gcs_data_article', value=articles_data)
ti.xcom_push(key='gcs_data_writer', value=writer_data)

In the above code I retrieve the Medium data on followers, articles and writers that I intend to write to MongoDB. Notice we have ti=kwargs['ti'] which is a keyword argument that is automatically passed to the Python functions when used as operators, such as ‘PythonOperator’. These keyword arguments contain contextual information and references that Airflow uses internally to execute and manage tasks within a DAG. The Task instanceti provides access to various properties and methods related to the task being executed , including XComs (cross-communication) and other task-related information. One may also pass any other parameters using op_kwargs parameter when defining the Python Operator. Here, upload_to_mongo_follower_data is a function that I defined for pre-processing the data before pushing to MongoDB.

def uploadtomongo(**kwargs):
try:
ti = kwargs['ti']
# Pull the data from xcom between tasks
gcs_data = ti.xcom_pull(key='gcs_data', task_ids='get-content-task')
d = json.loads(gcs_data)
hook = MongoHook(mongo_conn_id='mongo_default')
client = hook.get_conn()
# Access to the Medium Database and collection
db = client.your_database_name
test_collection = db.your_collection_name
print(f"Connected to MongoDB - {client.server_info()}")
print(d)
test_collection.insert_one(d)
except Exception as e:
print(f"Error connecting to MongoDB -- {e}")

In the above code we pull the data that we retrieved from the GCP bucket and insert/write one document at a time to the database and to a collection inside it.

In the code snippet above you can see xcom_push and xcom_pull which is a feature that allows tasks to exchange messages, small amounts of metadata, or other information. It enables communication between tasks in the directed acyclic graph(DAG) of tasks in your DAG python file. Also remember that although xcom is not limited to just JSON or dictionaries, and it can handle various data types, when you store and retrieve data using XCom, it is usually serialised to and deserialised from JSON format by default, so the data file should be serialisable.

3. Define tasks/operators

get_file_gcs_task = PythonOperator(
task_id='get-content-task',
python_callable=get_gcs_file_content,
provide_context=True,
execution_timeout=timedelta(minutes=30),
dag=dag
)

We limit ourselves only to Python operator for our project but there are several operators, like BashOperator, DummyOperator, EmailOperator, HTTPOperator and many more. Each of these operator performs a specific task.

4. Define the task dependencies

get_file_gcs_task >> upload_to_mongodb

Using the >> operator, we define the dependencies between tasks. This specifies the order in which tasks should be executed.

5. Upload DAG file

After you create the DAG file locally, we need to run it on Airflow composer, but I would also suggest you to test the DAG locally since running a DAG file on composer can be time consuming, here are some steps that you need to follow to run a DAG on composer:

a) Go to your composer bucket and upload you DAG file to dag/ folder

Fig 8: DAG files folder update

b) If everything went well, your environment page should look like this with a green check:

Fig 9: Checks for all libraries installed

6. Airflow webserver interface

c) At the top of Fig 9, we can see the ‘OPEN AIRFLOW UI’ link, when you open this it will be take you to the webserver interface which looks the same as your local Airflow but is hosted remotely unlike on 0.0.0.0:8080 which is locally hosted:

Fig 10: Airflow webserver interface

d) Once you have uploaded your DAG file it will take time for your dag_id to reflect here in the DAGs list, there should also be a default DAG airflow_montoring and any other DAG files you have.

7. Setup and Connect MongoDB cluster

To actually write the data to MongoDB we create a MongoDB cluster, a database and also collection on this cluster. I will be creating a dedicated cluster which is a paid service offered by MongoDB, but if your data is small you may go for a shared cluster which is free. In MongoDB, a dedicated cluster is exclusively used by one client or application, ensuring dedicated resources, while a shared cluster is utilised by multiple clients or applications, sharing resources among them.

a) Login to your MongoDB Atlas account go to deployment and database in the navigation panel on the left

b) You will see cluster, click on the ‘create’ button right hand corner to create a cluster. Now, choose what kind of cluster you want either shared or dedicated or serverless, choose the cloud provider (where your data will be stored).

Fig 11: Create cluster page

c) You need to remember the username and password that you setup for your cluster which is very crucial for setting up your connection to MongoDB.
For more information on the creation of a cluster, refer to the official documentation here.

Now we need to create a database and a collection for our database that we intend to write to from our GCP composer DAG. For more information on database and collection creation refer here.

Fig 12: Database and collection creation

Now we have our database and collection ready.

8. Setup and Connect MongoDB and GCP cloud

Now before we run our DAG we setup a secure connection to MongoDB and GCP bucket to perform pull and push data tasks.

a) To set up MongoDB connection:

Fig 11: MongoDB connection page

To obtain the MongoDB cluster connection string which can be obtained by clicking on the connection option for your cluster and going to the drivers option, your connection string will look as shown in the figure below:

Fig 12: MongoDB connection string

In the host section of your Admin mongo_default connection we need to paste the ‘clusterX.example.mongodb.net’ string. In login field, you write the username and in the password field, the password that you set in the ‘set up connections security step’ of your create cluster, remember this password is not the same as your MongoDB atlas account password. One must note that you should have enabled the IP address of your Airflow service account on the Network access page to be able to write your data from your GCP bucket to MongoDB database.

Temporarily you an add 0.0.0.0/0 in the list of Network access IP’s, but this is risky as anyone with your connection string can easily misuse your database. One can also create a private connection, but note that the connection string for your private connection is different and you need to change the host field in your Admin connection. To understand how to setup a private connection refer here. Be aware that establishing a private connection can incur additional costs, potentially increasing your expenses on the Google Cloud Platform billing side.

b) To set up Google cloud connection:

Fig 13: GCS connection setup

For this step you need to set the connection id(this is useful in your DAG code), select the connection type to Google Cloud, write your project id which in this case was already the default as your project id that contains your composer bucket.

The the simplest way to authenticate your connection is to copy-paste the contents of your project Keyfile JSON into the ‘Keyfile JSON’ field. To get the access ‘Keyfile JSON’ file for your project refer to the official documentation here. Store this keyfile safely somewhere.

Now your DAG should be ready to run.

9. Run Airflow DAG

a) If you click on your particular dag_id you should see the following page:

Fig 14: DAG Details page

I would suggest to initially setup the schedule as manual in your DAG to verify if your DAG file runs correctly, then you can setup schedule for your DAG file. Note all these parameters about your DAG schedule, tasks name, etc. will be read from your DAG python file automatically so you don’t need to specify them. You can manually trigger the DAG by clicking on the run button on the right corner, select ‘Trigger DAG’. It will trigger your DAG file and if everything goes as per your DAG code you should see green status on each tasks as shown on the left side of your Airflow DAG. To understand the meaning of each status, refer to the status icons located at the top of your page.

Fig 15: Trigger DAG manually

10. View Airflow logs

b) logs
Logs in your Airflow Composer can be described as detailed records or entries that provide information about the execution, status, and outcomes of tasks within the Airflow workflow. This will tell you what went wrong with your script and you can make the necessary changes to your DAG accordingly. To understand DAGs updates and uploads refer here.

11. View DAG Code on Airflow

c) In the ‘<code>’ section of your DAG file you can see what python code your Airflow is executing.

12. Managing your Budget

Note that once you create an environment, you will be charged substantially for every hour that you have an environment. Also making API calls to the medium API for scraping data. Same would happen on the MongoDB dedicated cluster side. So one should carefully monitor the billing management of your project else you may end up paying more. Lets breakdown the cost for each part:

  1. Medium API cost: 12500 API calls costs around 20$
  2. Cost for MongoDB and GCP:
    MongoDB does not charge you for private endpoint connection but you will incur extra costs on the cloud provider(GCP) side, below is graph of my expense and initially I had private endpoint connection and one could see my networking costs were substantially higher:
Fig 16: GCP services cost graph

3. MongoDB cost
MongoDB cost was mostly dominated by dedicated cluster costs as shown below:

Fig 17: MongoDB costs

I would also suggest you to delete the composer environment and recreate the environment

Concluding Remarks:

In this article we discussed MLOps concepts of scheduling Data scraping for the medium API and writing the data to MongoDB regularly. To summarize:
1) We discussed Airflow composer on GCP and how to make Airflow connections to MongoDB and GCP bucket.
2) Another challenging part of the project involved successfully connecting Airflow composer to GCP bucket and MongoDB, as the connection requirements are different for both private and public endpoint connection.
3) To connect the GCP bucket to your Composer we discussed the JSON keyfile way of connecting the bucket.
4) We also discussed various components of a DAG file including DAG object and operators, and also discussed the composer environment and how to install PYPI packages in your environment.
5) We also discussed running Airflow DAG and finally concluded the article with breaking down costs involved in the project.

This comprehensive guide aims to empower data engineers and DevOps professionals with the knowledge to streamline their data operations efficiently on GCP using Airflow.

--

--

Ranjeet Shailendra Nagarkar
USF-Data Science

Master in data Science, USFCA, Chemical Engineering, IIT Madras, RBCDSAI, Ex-Gyandata