I am your father… Data Lineage with Cloud Data Fusion

David Verdejo
bluekiri
Published in
15 min readJun 25, 2020

--

First of all, in case you don’t know what is Cloud Data Fusion, we are going to review its definition from Google’s documentation: “Cloud Data Fusion is a fully managed, cloud-native, enterprise data integration service for quickly building and managing data pipelines”.

This is service offers us a full ETL solution that organizations can manage through a web UI to create and control their data pipelines while Google takes care of the setup and maintenance of the underlying infrastructure. The main benefits are:

  • Better accessibility: users without development skills can create and maintain their own workflows when your organization does not have access to a data analyst team or this team is overloaded.
  • Reduce the development time: through the UI, we can create an entire workflow in a few clicks.
  • Avoid vendor locking: you can run your workloads on Google Cloud Platform or on other cloud providers even on-premise because the solution is based on the open-source project CDAP.

It’s time to introduce some basic concepts to help to understand the solution:

  • Cloud Data Fusion instance: Unique deployment of Cloud Data Fusion that you create on GCP.
  • Execution environment: is where your pipelines really run. Under the hood, Cloud Data Fusion creates ephemeral Dataproc clusters or we can reuse your existing Dataproc clusters.
  • Pipeline: there is a visual representation (logical pipeline) that it’s what you create through the web UI. Then it’s converted by a planner in a physical pipeline which is a collection of programs and services that read and write through the data abstraction layer. The data flow of a pipeline can be either batch or real-time.
  • Plugin: data sources, transformations, and data sinks are generically referred to as a plugin. We can see them as customizable modules and we can create our application using the existing plugins included with Cloud Data Fusion (it provides by default plugins for sources, transforms, aggregates, sinks, error collectors, alert publishers, actions, and post-run actions) or we can develop our own plugins.
  • Provisioner: it has the responsibility of creating and destroying the pipeline’s physical execution environment; in our case, the ephemeral Dataproc clusters.
  • Profiles: using profiles we can assign a provisioner name and a set of configuration settings for that provisioner to our pipelines (control the execution time and costs)

But before we delve deeper into the Cloud Data Fusion details, there are other solutions on GCP to help us to transform our data, but remember that Cloud Fusion is the only one that it’s a complete ETL solution.

  • Dataflow: Unified stream and batch data processing that’s serverless, fast, and cost-effective based on Apache Beam.
  • Dataprep: Cloud data service offered by Trifacta to visually explore, clean, and prepare data for analysis and machine learning executed on Dataflow workers.
  • Dataproc: Managed Spark and Hadoop service that lets you take advantage of open-source data tools for batch processing, querying, streaming, and machine learning. We can migrate our current data workflows into Dataproc without redevelopment (lift-and-shift).
  • BigQuery: Serverless, highly scalable, and cost-effective cloud data warehouse designed for business agility. Yes, I haven’t gone mad! Through federation, we can use SQL to clean and transform our data before we load into BigQuery.

In order to help you to choose the right tool, I want to share my decision map on how to choose the right transformation tool on GCP:

Let’s create our Data Fusion instance. Firstly, we have to check if the Cloud Data Fusion API has been enabled in our project.

If the API is enabled, we can go to Data Fusion under the “Big Data” section

If we don’t have any instance, the page offers us the option to create one. Click on “Create an instance”

Then we have to enter the following data:

  • Instance name
  • Description
  • Edition: you can choose between basic or enterprise (we cover later in this section).
  • Region: choose the one that is closest to the original source.

And in the “Advanced Options”:

  • Enable Private IP: connections to our instance will be established over a private VPC network and all traffic remains private (we will review this point after the instance creation).
  • Enable Stackdriver logging/monitoring service: to ingrate our instance with Google’s logging and monitoring tool (take into account the charges of these services on your budget).

Related to what is the right edition for you, you have to check the differences between the options in the following link:

Simple conclusion: use Basic for developing/testing and Enterprise to go live.

There are differences in the price too: the basic license is almost a third of the enterprise license. On the other hand, when you execute your pipelines, Data Fusion creates a Dataproc cluster, and you are charged following the current Dataproc rates.

Review all the info and click on “Create” and …

Have a coffee because the documentation suggests that it takes up to 30 minutes for the instance creation process to complete (not always, I have some instances in 15 minutes or less)

Let’s talk a little bit about Data Fusion networking during the instance creation. In our example, to keep it simple we don’t check the “Enable Private IP” option in the “Advanced Options” (it requires extra permissions). This option creates a VPC Network Peering to establish network connectivity to your VPC network (review VPN Network Peering limits). This allows Cloud Data Fusion to access resources on your network through private IP addresses offering less network latency, more secure access, and lower costs.

From a networking perspective, my recommendation is to peer with your organization shared VPC network to gain access to other GCP projects and your on-premises resources through a Cloud VPN or a dedicated Interconnect.

After a while, we have our instance available. To access, click over “View Instance” in the column Action

Then we access to our Data Fusion instance through the web UI:

Let’s describe the upper bar:

  • Dashboard: for monitoring and review the execution of the latest jobs
  • Hub: add reusable Cloud Data Fusion pipelines, plugins, and solutions. Note that we can connect to other cloud solutions from AWS, Azure, …
  • System Admin: access to system configuration and logs
  • Documentation
  • Edition: check our current edition

In the main panel:

  • Wrangle: to map and transform our data interactively
  • Integrate: opens Data Fusion Studio. This is the place where we can create our pipelines.
  • Discover and govern: data governance and data lineage
  • Monitor: see“Dashboard”
  • Manage: see “System Admin”

And finally, we have the lateral bar:

In this section only highlight that at the bottom we can select the Namespace (default is created during the setup). A Namespace is a virtual partition of our Data Fusion instance where we can group application, data, and metadata.

Let’s start playing with Data Fusion. Firstly, we need some data to play with. In this demonstration, we are going to use the following simple schema:

You can download the csv data from Github and upload to your Google Cloud Storage bucket:

What is the next step? We want to explore our data. From the Data Fusion panel, select “Wrangle”

Navigate to our bucket and open

We can see the file content

The first step, parse the file

Now, we can see our columns and, in the right panel, we can review the transformations

In the right panel, we can change to the “Columns” tab to observe the completion percentage (not null values) by column

Go to “Insights” tab

In this step, the data is to be understood more deeply. Before implementing methods to clean or transform it, you will definitely need to have a better idea about what the data is about.

Let’s start transforming our data. First, we want to extract the email domain

Create a custom pattern:

Rename the column and observe how new steps are added to our transformations:

Next, we are going to fill the null values in the gender column with “N/A” (not available)

We are going to drop the body column

And before move on, we are going to check our schema:

Notice that employee_id is a string:

We need to change the field type to integer

At this point, if we are finished with all transformation, we can click on “Create Pipeline”

It opens Data Fusion Studio to visually create our pipeline:

We are going to read the other csv from GCS. Add a GCS source and click on “Properties”

Add the required info (don’t forget to define the output schema) and click on “Validate” to check that everything is fine (in every step, validate before move on to the following step):

Next, we want to join the data from both files. Select “Analytics” in the left menu and add a “Joiner” and connect it to the outputs of the previous steps:

If we open the “Joiner”, we see that it complains that we have a duplicate field

Create an alias and click on Validate and update the output schema

And in the last step, we want to save the data to BigQuery. In the left menu, go to “Sink” section and click on “BigQuery” and connect the output from the “Joiner” to the input of this sink

Go to the Properties and add the required fields and “Validate”

We have our first pipeline. It is time to “Save” our job as a draft

Now, we can “Preview” our pipeline. It’s recommended that you “preview” the pipeline before deploying to production to verify that everything is fine.

And “Run” our pipeline (you can change the settings in “Configure”)

After the preview execution, you can review the logs

And then we can click on “Preview Data”

Finally, we are ready to deploy our pipeline

Once the pipeline is deployed (you can check the version in Details)

Before running the pipeline, we have the option to tune our Dataproc cluster (important if you want to control the execution time and the cost)

The moment of truth has now arrived… press “Run” button and you can see that it starts to provision our ephemeral Dataproc cluster in order to run the pipeline

Starting…Running… and finally Succeeded (we can see a “Summary” to compare with previous executions)

And we can check the destination table in BigQuery

Now we realize that we have PII information (email field) in our destination table … we need to modify our pipeline to redact this information.

In our current situation, the best option is to modify the Wangle and mask data in the email column

Too simple… let’s imagine a more complicated scenario. Modify our schema and we want to add data from our internal ticket system.

The “text” column contains PII information (in this case email)

We are going to modify our pipeline to add the new dataset:

What GCP tool can help us to carry out this task? Solution: Cloud DLP.

Our first step will be to check if we have to enable Cloud DLP API in our project

When the API is enabled, create a template

Then go to IAM to grant the “DLP Administrator” role to the Cloud Data Fusion Service Account (service-project-number@gcp-sa-datafusion.iam.gserviceaccount.com)

Next step, we need to add the DLP plugin to our pipeline. From Data Fusion instance, go to “Hub” and select “Data Loss Prevention”

Now we need to edit our pipeline. It’s important to notice that you can’t edit a deployed pipeline (all new pipelines need a unique name). We can duplicate a deployed pipeline or you can edit the corresponding “Draft” if you saved (this is our case)

Now we can add the “Redact” step (included in the DLP plugin) to our pipeline

In the plugin properties, enable “Custom Template” and select to apply to email field

Save the draft and click on “Preview” to validate the changes

And ready to deploy…

The last feature I want to review is data lineage. This feature requires the use of the Enterprise edition. To do it, we have to delete our current instance and create a new one with the Enterprise option (NOTE: don’t confuse the “Upgrade” button from the Data Fusion instance page that is to upgrade the software version).

First of all, export your current work. In the Studio UI, click on “Export”.

Note: can use this option to export your code and add it to your control version tool too.

Next, delete the current instance from the instance page

And create the new Enterprise instance

When the instance will be ready, go to the instance page > “Studio” and “Import” and “Deploy”

If you receive a message that there is a missing plugin, you have the option to add it.

And we are ready. Let’s imagine that we are planning to change the country field from ISO code to full country name

And before making the change, we want to evaluate the change impact, and here is where data lineage could help us. Go to “Discover and govern”

In order to find the dependencies, we are going to search for “schema:country”

The results show that we have one dataset with this field

And click on the dataset and we can review the dataset schema

And finally, we can analyze the lineage of the country field and we can carry out the planned change in complete safety

To sum up, this tool is great for organizations that:

  • Want a fully managed ETL solution
  • When users don’t have development skills
  • Do a proof-of-concept or want to explore a dataset (you need a quick development cycle)

Could we replace our data engineer teams with this tool? The answer is NO. The main goal of the data engineer teams is to optimize the performance and cost of our big data ecosystem:

“If you have a problem, if no one else can help, and if you are on GCP, maybe you can hire … Google Data Fusion” (modified A-team opening narration)

PD: Psst, one secret… if you have a problem, maybe you should call the Bluekiri-Team

--

--