Data Pipelines at PasarPolis using Airflow and BigQuery

Pramod Bisht
Pasarpolis Product + Tech
8 min readMay 27, 2020

As discussed in the previous blog — Data Analytics and Insights, Airflow met the following criteria to become our choice of ETL (extract-transform-load) platform -

  1. Ease of use when it comes to creating pipelines
  2. Visibility into scheduled jobs and pipelines — Historic runs and current progress
  3. Ability to integrate Python code for performing transformations
  4. Connectivity with our existing systems

Airflow and Cloud Composer

These are the key terms that one has to familiarize themselves with when working on Airflow:

  1. DAG — Directed Acyclic Graph is used to define the flow of data or logic from one node to another. Each DAG represents a data pipeline. It can be visualized as a tree view or as a Graph as shown in the image above.
  2. Hook — It acts as a connection between the Airflow ecosystem and the external services like a database, or storage services like GCS and S3.
  3. Operator — It is an execution unit, a DAG consists of many of these units chained together.
  4. PythonOperator, BashOperator, MySQLToGoogleCloudStorageOperator are examples of commonly used operators.

One can create custom operators too. We created custom operators to perform actions like — Send email notification, multipart export to storage from BigQuery. The purpose of creating a custom operator is to either tweak the behavior of existing operators or create an interface to the new service or platform.

  1. Tasks — Tasks are the parameterized instantiated objects of the operator.
  2. Variables — Variables are Airflow’s internal key-value store, in which data can be stored that could be accessed from the same DAG or another DAG. They are often used to store DAG configuration.
  3. Connections — This is Airflow’s abstraction to take care of databases or other resource connections. It takes care of creating new connections when earlier connections turn stale. A single connection can be mapped to multiple databases to ensure fault tolerance when the database server is restarted or it goes down. It ensures that a pipeline creator need not provide connection details every time he/she wants to connect to a data source. Besides that, connections are stored in such a way that it is not available in plain text to the pipeline developers.
  4. Configuration — There are various airflow settings that can be tweaked as per the requirements. The following are some of the important configuration parameters:

DAG_concurrency: This limits the number of task instances allowed to run concurrently within a specific dag. We set it at 15.

Retry_delay: It decides the delay between retries for a given task.

Parallelism: It decides the maximum number of tasks that can run in parallel across all DAGs at a point of time. This setting is applicable across the cluster. We set it at 30.

Start_date: Make sure to take care of the ‘start_date’ parameter in DAG it should be a static date in the past or a dynamic allocation using airflow.utils.dates.days_ago(X). X should be greater than the interval of the cron. For example, if it is a daily job then X>= 2. If the cron is not running as expected refer to this excellent article.

There are more settings that one can tweak in the config file, but we left all of those at their default values.

You can refer to this link — Airflow official tutorial — to understand what a canonical DAG/Python file structure looks like.

What is Cloud Composer?

Google Cloud Platform provides managed/hosted service of Apache Airflow, called Cloud Composer. We chose the machine type — n1-standard-2. It has 2 vCPUs and 7.5 GB of RAM. It has to be a multi-node setup, with a minimum number of nodes that can be set up restricted at 3. They are homogeneous machines orchestrated by Google Kubernetes Engine. We did not enable auto-scaling because it isn’t required. Currently, our Airflow Google Kubernetes Engine cluster is underutilized. The services that run on the cluster in Kubernetes pods and containers are -

  1. Web Server that powers the admin GUI
  2. Cloud SQL | MySQL database that stores Airflow metadata
  3. Redis as the message broker for Celery Executors that control the workers
  4. Scheduler schedules code as per the cron defined per DAG
  5. A worker performs the compute defined within each DAG

It is GCP’s wrapper around Airflow.

We chose this over a self-managed deployment of Airflow on GCP virtual machines because -

  1. Composer is GCP flavored Airflow. It is a single click deployment of Apache airflow on the Kubernetes ecosystem. Cloud Composer has made few patches on Apache airflow to ensure its stability.
  2. It has G-Auth integration
  3. It is easy to set up and use Continuous Integration
  4. Its logs are available for review and analysis on Stackdriver
  5. Minimal configuration is required to make it talk to Google services like GCS, BigQuery, Pubsub, etc. Such abstractions are ready to use out-of-the-box in the form of Operators and Hooks.

Typical pipeline requirements

  1. Move application data from Cloud SQL to BigQuery. It goes into the Raw schema first, and then into Staging. There are no transformations from SQL to Raw, but there are a few/minimal changes when data is moved from Raw to Staging. These two are sequential steps.
  2. Prepare the Reporting schema tables which are in turn used as data sources for downstream reporting and analysis. We do quite a bit of cleaning, transformation, aggregation, and denormalization when the data is moved from Staging to Reporting.
  3. Run scheduled reports that our operations, finance, and product teams get as Excel or CSV attachments with emails. Check this link to know more about how we simplified this aspect using Airflow.
  4. Crunch data for telemetry applications. We ingest the incoming files at GCP Firebase storage. They contain timestamped event data at user-session grain. We run heavy math on that data to get calculated values for metrics like — acceleration, braking, focus, consistency, turns, etc.

Data Warehouse on BigQuery using Airflow

Move application data from Cloud SQL to BigQuery

This process is broken down into 2 DAGs. The first DAG runs queries on the MySQL tables from which we want to pull the data. It fetches the row counts per table. Then it triggers the second DAG with specific instructions around operator creation per table.

We use the row counts to break the processing of tables into units of 100,000 rows maximum per task. If a table has 250,000 rows that we have to read from Cloud SQL and write into BigQuery, we create 3 tasks on the second DAG on the fly to do this execution.

It is important because some tables have row counts in 10s of millions. Each table is processed in a single task. We want to keep the size of a task as homogeneous as possible. That allows us to control and predict the Composer Cluster’s resource allocation accurately when designing DAGs.

With that information available, we trigger the next DAG which has the batches identified per table. It looks like this.

Why did we use BigQuery Operator to fetch information from Cloud SQL? BigQuery provides an option to connect to Cloud SQL using a federated connection. That way we don’t have to do any export to disk and then import to BigQuery. Very convenient. We created these connections at the panel called ‘External Connections’ in the BigQuery console shown below.

There are two types of data pipelines that we have -

  1. Full truncate and load — We export our tables content directly to Big Query using the WRITE_TRUNCATE method. It ensures that the existing table is truncated, and then recreated before inserting data. It recreates the entire table.
  2. Delta pull — We export only the newly created and updated rows to GCP storage in a fixed defined interval of time, in our case it’s hourly. We use the WRITE_APPEND method to write files content to a temp table, which is subsequently merged to the main table using Bigquery MERGE operation.

The choice between the two approaches was made for each table that had to be moved into BigQuery. We looked at the rate of updates and inserts per table per day in descending order. The top 20% of those tables were implemented as delta-pulls.

These newly created and updated tables are replicas of the databases and tables on Cloud SQL.

Prepare reporting schema tables

These are the tables that are used as data sources for downstream reporting and analysis. Metabase, Tableau, and Excel spreadsheets fetch information from the Reporting dataset.

As we move the data from Staging to the Reporting layer, we -

  1. Extract useful key-value pairs from document fields of JSON/LONGTEXT type
  2. Denormalize the data, by performing joins between relevant tables
  3. Filter it to put in apt tables
  4. Partition based on date fields based on how specific subset are queries
  5. Rename fields to make them more user-friendly
  6. Perform aggregations and save their output

This is a BigQuery to BigQuery operation wherein we move data from one dataset to another. It is accomplished by using the standard BigQueryOperator.

It is a full-truncate-load job for all tables.

Implied pipeline requirements

External trigger to run ETL

There are situations wherein we have to set up external triggers to execute the actions defined in Airflow DAGs. An example of that is a dashboard that we have to prepare on Excel only when the end-user uploads an input file. There is no fixed schedule for the availability of the input file. As soon as it is found at a particular path (GCS bucket), we trigger a cloud function. That cloud function triggers the DAG by sending it a dictionary through an HTTP request.

Ability to run a DAG for any past date

There are situations wherein we change the logic behind running aggregations. We then have to get the values for older dates/prior runs using the updated math function. An example of that is when we process the telemetry data to come up with the daily user scores in our vehicle insurance application called SafeDrive. When we change the logic, we want to retrospectively recalculate some specific scores for older dates using our existing DAG. We created an airflow variable called `day_delta` to be used in these situations. We get the apt date for the current DAG run by subtracting `day_delta` value from the current date. This logic goes into our custom SQL queries which run and recalculate values.

--

--