How we made data sync between Salesforce into BigQuery at NestAway?
A data warehouse is constructed by integrating data from multiple heterogeneous sources that support analytical reporting, structured and/or ad hoc queries, and decision making. “Why not include Salesforce?”
How we made sync between our CRM platform Salesforce into BigQuery(data warehouse) at NestAway? Article co-authored with Sai Rohith
Let's understand the Problem :
Data sync between systems is very important when you have a lot of dependency between the main app, 3rd party apps or the CRM tools. Here in NestAway, we use Salesforce to handle operations and we needed the data into our data-warehouse to do a lot of analytics and machine learning.
Question is now how to get the data pipeline ready to do this kind of real-time analytics with the Salesforce data?
Solution:
There are many 3rd party tools available in the market StitchData, Alooma, Heroku, etc.
We already had a data pipeline to sync out databases to BQ based using Airflow. Hence we developed our own data-pipeline using Airflow to get the data sync between Salesforce to BigQuery.
Lets deep dive into the architecture to understand it better.
There are two services we developed in house and deployed the workflow management tool:
Components
- Hippogriff (A poller agent): (ignore the names😅, we keep it always fancy)
Spring boot application responsible for pulling data from both MySql/PSql DB and Salesforce Apex HTTP APIs in a multi-threaded environment, which stores and then upload it to GCS bucket in JSON file (new line delimited format)
2. Crane (A loader agent to BQ):
Rails application responsible for loading operation from GCS bucket into BQ dataset by taking care of real-time changing schema for tables and handling multiple loading request.
3. Apache Airflow:
We use Apache Airflow to orchestrate our batch processes, run regular schedule/cron jobs, ML and data pipelines.
Basic architecture
How salesforce data is retrieved ?
Salesforce uses basic OAuth 2.0 authentication which enables secured login. We used Force.com REST API connector https://github.com/jesperfj/force-rest-api as a dependency to connect and fetch data. Using this you need not worry about the auth token generation or mostly refreshing the token after expiration, it handles internally.
Let's say we have 1 SObject to fetch from SF which is LEAD with 1000 records
URL to fetch SObject would be something like
https://company.visual.force.com/services/data/v43.0/services/data/v43.0/sobjects/Lead/123456
To fetch let's say 1000 records from this, we might need to hit this HTTP call probably 1000 times. Insane!?💀 A better way would be to partition the data by date-time and fetch using Query format.
URL to fetch SObjects with query and date would be like
https://company.visual.force.com/services/data/v43.0/query/?q=SELECT+Id,FORMAT(CreatedDate)+FROM+Lead+WHERE+CreatedDate+>+2018-11-25T00:00:00Z
where Salesforce gives the data in a bunch of 200 records per request, along with a URL of the next 200 records.
Question is now where you get the query params or the column names which has to be added to a query. You can get it in describe call and get the required columns to be fetched.
Http calls to SF before : 1 auth + 1000 records = 1001 calls
Http calls to SF after : 1 auth + 1 describe(not required if you know which columns to fetch) + 1000/200 = 7 calls
Let's say there are 10 SObjects to be fetched each having 1000 records, Hippogriff runs in a multi-threaded environment where each thread is responsible to handle each SObject/table (see the architecture above for more info).
Files are uploaded to a temp folder in GCS to avoid network latency and bulk upload failures. Once uploaded to a temp folder, they are moved to active_jobs in the same bucket.
Using Airflow DAG, we run our first task as KubernetesPodOperator to do this kind of resource intensive task which spawns up a k8s pod based on Docker image.
Must read for this: https://medium.com/bluecore-engineering/were-all-using-airflow-wrong-and-how-to-fix-it-a56f14cb0753
Airflow runs this task every 15min where it brings up the k8s pod (hippogriff poller agent), pulls the data from SF and records are further stored into JSON file with the new line as delimiter and uploaded to Google Cloud storage.
Once uploaded, the next task of Airflow DAG would be to call Crane (loader agent) which further starts loading files from one folder to Google BigQuery.
Once data is present in GCS, Data is loaded to BQ in multiple phases as below:
1) Making BQ equivalent schema based on SF-BQ datatypes mapping.
2) Update the schema of tables in BQ if any schema change happened in salesforce SObjects.
3) Use BQ load job service to load files to temporary tables in BQ by passing the GCS file URL and the updated BQ schema.
(You might wonder why copy data to temp tables in BQ instead of directly copying to the source table in BQ?)
BQ Load Job service guarantees atomicity only at the file level, not on a batch of files. So assume we have a batch of 2 files belonging to 2 tables.
We always want all(in this case 2 tables) our tables to be up-to-date in BQ not just some of them.
4) Copy all the temporary tables to source tables in BQ (eg: temp_lead to lead) and then drop the temp tables.
5) Do any post upload tasks if needed (like sending batch success/failure notification, creating views in BQ, validation of data between BQ and source DB, etc.)
Challenges (how we overcame):
We faced a lot of challenges during the process with the infra, resources, network, etc since we wanted to migrate data between systems which is a very resource intensive task and to keep the pipeline running every 15min. Following are some of them :
- Running KubernetesPodOperator in-cluster (we sat with DevOps to run the K8s operator in-cluster without passing any helm charts OR k8s config file inside the Airflow)
- Validating the data which has migrated to BQ (we added more DAGs running every night that validates the data which got migrated previous day)
- Upload/Move failures into GCS due to network latency (we added last_sync_time, slack integration, and retries with all the processes so we never miss any bunch of data during network downtime)
Conclusion:
Having data sync for Salesforce into BQ is very much useful for analytics and machine learning purpose. Data pipelines sound easy but as you see it is pretty complex to develop. Airflow is one of the tools which helped us in rapid development and made life easier for our day to day operations. 🙂
Last but not least, special thanks to my colleague Sai Rohith for contributing to this project 🙌
Also the core members of Data-Engineering team at NestAway: Rohit Pai and Sonali Dave.
Have any questions or thoughts? Let us know in the comments below! 👇
Happy coding.