Framework for building a configuration driven Data Lake on GCP using Data Fusion and Composer

Neha Joshi
Google Cloud - Community
6 min readJan 15, 2021

The first article in this series provided an overview of a data lake solution architecture using Data Fusion for data integration and Cloud Composer for orchestration.

In this article, I will provide an overview of the detailed solution design based on that architecture. This article assumes you have some basic understanding of GCP Data Fusion and Composer. If you are new to GCP, you can start by reading the previous article in this series to get an understanding of the different services used in the architecture before proceeding here.

Design Approach

The solution design described here provides a framework to ingest a large number of source objects through use of simple configurations. Once the framework is developed, adding new sources / objects to the data lake ingestion only requires adding new configurations for the new source.

I will publish the code for this framework in the near future. Look out for an update.

Design Components

The solution design comprises 4 broad components.

  • Data Fusion pipelines for data movement
  • Custom pre-ingestion and post-ingestion tasks
  • Configurations to provide inputs to reusable components and tasks
  • Composer DAGs to execute the custom tasks and to call Data Fusion pipelines based on configurations

Let me start with a high level view of the Composer DAG that orchestrates all the parts of the solution, and then provide insight into the different pieces of the solution in the following sections.

Composer DAG Structure

The Composer DAG is the workflow orchestrator. It can broadly comprise components shown in the image below.

Components of the Composer DAG

The DAG reads the configuration files (detailed in the next section) for details such as source and target details , and passes that information to pre-ingestion and post-ingestion tasks as well as to the Data Fusion pipeline. The image below describes the overall flow.

Workflow process flow

Each pre-ingestion or post-ingestion task is a distinct Airflow task in the DAG, and calls python code that contains the logic of the task. Below is a screenshot of a Composer DAG based on the above approach.

Based the flow described above, below is a sample Composer DAG comprising of pre-ingestion and post-ingestion tasks, along with calls to the Data Fusion pipeline.

Ingestion Configuration

Configuration in this solution is maintained at 3 levels:

Sample Environment Configuration

Environment Configuration for information such as GCP project ID, Data Fusion instance and GCS bucket information.

Sample DAG Configuration

DAG Configuration to provide information required by the DAG for each source system.

Sample task configuration

Task Configuration to specify inputs for the Data Fusion pipeline, for instance the source, the delimiter and pipeline to be triggered.

Dynamic Task Generation in DAGs

The DAG is written to dynamically generate Composer tasks based on Task Configuration file. Each task generated this way will trigger the corresponding Data Fusion pipeline using the source specifications provided.

Data Fusion Pipeline and Custom Plugins

Shown below is a Data Fusion pipeline that loads files from GCS to BQ. This is a simple example and you can write UI driven pipelines with additional logic as per your requirement for a variety of data sources.

Pipeline above uses the Wrangler plugin to parse the file layout. You can even go a step further and write your own **custom plugin that can read multiple files, each file with a different layout, parse them on the fly and load into respective targets all in one go. Writing a custom plugin does call for java programming skills though, and writing generic auto-parsing logic can get tricky as you discover new scenarios in your files.

**Data Fusion provides a variety of source, sink and transformation plugins out of the box. If you need to perform certain transformations that are very custom to your needs and not available out of the box, you can also write your own custom plugins and use them in your pipelines.

Calling Data Fusion Pipelines from Composer

Now that we have insight into the tasks that the Data Fusion pipeline and the Composer DAG need to perform, how does Composer call the Data Fusion pipeline?

CloudDataFusionStartPipelineOperator allows triggering a Data Fusion pipeline from an Airflow DAG. There is more information about this operator in this GCP blog.

Sample Airflow Task to call a Data Fusion pipeline

Calling Python executables for pre-ingestion and post-ingestion tasks from Composer

Often, there are custom tasks that need to be performed in the data ingestion workflow but which do not belong in or cannot be performed in the ETL tool. Some typical examples include customised cleansing of source files to get rid of incompatible special characters in data, updating the column descriptions in the target table to support data discoverability, archiving processed files based on certain conditions and customised logging of the workflow status in a custom audit table at the end of each task shown in the DAG shown earlier.

These tasks can be coded in python, which can then be called from within Composer with the help of Airflow PythonOperator. I will not go into the details of this operator since there is plenty of great articles already available on this.

Snippet of Python operator to execute a custom task

Key Takeaways

The solution design described above provides a framework to ingest data from a hybrid ecosystem into the data lake. It does so by using making use of simple configurations to provide details about the environment, sources and targets, as well as details of the Data Fusion pipeline to be executed. Extending the data lake to add more sources is easy and only requires configuration to be added for the new source objects.

To move data from source to target, Data Fusion pipelines are used. Custom tasks that do not belong in the ETL tool or cannot be performed in the ETL tool can be written in Python and integrated into the Composer DAG for an end to end orchestration of the workflow.

What next?

One of the post-ingestion tasks you may have noticed in the sample DAG screenshot is ‘apply_data_dictionary’. This task updates the business metadata and column descriptions into the tables loaded into the data lake.

Ensuring data discoverability is an important aspect of deriving value of the data available in the data lake and business metadata is a key enabler to that end. In the next article, I will go into further detail on how you can enable data discoverability of your data platform on GCP.

--

--

Neha Joshi
Google Cloud - Community

Neha is a Cloud Consultant in Google Cloud’s Professional Services group. She specialises in Data and Advanced Analytics, helping large enterprises adopt GCP.