How Does Tokopedia Take Airflow to the Next Level?

Sulung Suganda
Tokopedia Data
Published in
4 min readAug 11, 2020

Data Engineering team in Tokopedia is the backbone unit for the whole Data team. We provide a system with high availability to perform complex use-cases and support all business units in gathering insights from the Data. Subsequently, the insights are used in the decision-making process or even for creating prediction. We believe the data can support business use cases as well.

In the first phase, we used Airflow to support all cases with “Physical” DAG (Directed Acyclic Graph, FYI) Python files. You can imagine how arduous it is, as we need to create Dag Python files for every single request while also keeping and managing numerous scripts. Doesn’t matter if it is a request for table migration, aggregated tables, or metrics, we need to support it by creating Dag manually in this phase.

“Physical” DAG (Directed Acyclic Graph) Python files

Starting with the problem of having too many requests like migration tables and aggregated tables from business units, we finally made a plan to build automation processes in the Airflow, as we don’t want to maintain too many scripts for every single request. One of the reasons is that we want to reduce code bloating in the repository.

Basically, the main pipeline was for migrating data from DB, daily reporting, aggregated tables, metrics creation, and dumping data to external storage. So, in the first phase, we collected our cases and converted it to a metadata-design.

In the first release, Airflow Automation Job was intended to reduce a manual intervention of Dag Creation for each request, for example:

Task Automation Workflow

Generator Tasks Workflow

Metadata-cache will be synchronized every 3 hours or can be updated by synchronizing manually. Task Creator (Python) will then connect to metadata-cache to get tasks mapping and generate the tasks by loop for each Dag. At the same time, all of the functions in this feature combine BigQuery to BigQuery, BigQuery to Google Cloud Storage, and BigQuery to Big Table by a Task Creator.

The Task Creator can generate the tasks for each Dag in one loop. Therefore, when the Task Creator creates a task, the Task Creator can set task dependency to each other. In this article, you can find how to generate Dag Dynamically in the Airflow, as we do it the same way.

Task Metadata Architecture
  • database_connection table is to define the target databases that will be migrated. The table contains fields such as target host, database name, and even extra parameters in the connection string. For security reasons, we have encrypted every sensitive key in the metadata.
  • main_dag table is for determining task groups or where the task migration is scheduled. All configurations are available to automate the Dag parameters in the Airflow. I.e Schedule Interval, Dag name, Dag Description, Start Time, etc.
  • dataflow_config table is to define the Dataflow machine type that is used for slave migration & Big Query to BigTable pipeline. We can define how many workers for a specific task, define size Disk and even Disk Type.
  • sub_dag table, to wrap multiple tasks in a SubDagOperator. This feature is very useful if we have a Big Task, because with a sub_dag table we can separate the task to smaller tasks and distribute the task execution parallelly.

We have resolved the manual intervention problem to create the pipeline. Now, we have implemented them in the production environment and support the pipelines that can depend on each other with supported pipelines:

  1. Slave (Postgresql & Mysql) to Big Query
    slave_to_bq table is the main table for migrating data from slave (Mysql and Postgresql) to Big Query. Basically, Slave migration to Big Query is using BashOperator in Airflow to generate Apache Beam SDK applications and submit the job to Google Cloud Dataflow. Task creator will combine all config in the metadata and translate as parameters in Apache Beam Job. In this pipeline, we can set the priority weight for each table migration as well.
  2. Big Query To Big Query
    bq_to_bq
    table generates the BigQueryOperator task. We can add the results of the query to the existing table or overwrite the table with results. The job will depend on the BigQuery quota itself. Sometimes, we got our quota exceeded and because of it, we need to separate the tasks.
  3. Big Query To Google Cloud Storage
    bq_to_gs
    table generates the BigQueryToCloudStorageOperator task. We can export a table to Google Cloud Storage with supported file formats such as AVRO, CSV, NEWLINE_DELIMITED_JSON.
  4. Big Query to Big Table
    bq_to_bt
    table migrates data BigQuery to BigTable, similar concept with Slave migration to Big Query. The task will be using BashOperator in Airflow to generate Apache Beam SDK applications and submit the job to Google Cloud Dataflow, as a migration engine.

With this Automation Airflow Pipeline, Business Analysts in Tokopedia can now improve the existing pipeline and integrate the new pipelines for 5 minutes only. Now, we are scheduling more than 1000 DAGs and more than 9000 tasks every day.

--

--