Building some Data Pipeline with Google Data Fusion

Marcelo Marques
Marcelo Marques
Published in
9 min readAug 5, 2020

Data Pipeline is basically move some data from one place to another. This move may have or not changes, processing, …

For companies that works with big ammount of data, this is basic job. Data is received, transformed, enriched with other data if needed, moved to data lakes or any other place, and most of times, finished with some beautiful dashboard.

We have in Google Cloud the Data Fusion service. Basically is a service for efficiently building ETL/ELT data pipelines. It uses Cloud Dataproc cluster to perform all transforms in the pipeline. Let's see how it works with some sample.

Let's use one os the most used samples of Google. The NYC Taxy data!

First thing, let's enable the api for Data Fusion. In Cloud Shell, just run

gcloud services enable datafusion.googleapis.com

Now let's go to Data Fusion page. Click on "create instance". Put the name you want to the instance name and click on create. This should take 15 minutes to be ready. 😭

Still there? Good! Ok, let's go ahead. You need to give some permission to the service account created by Data Fusion. For this, click in the instance name and copy the service account

Open the Cloud Shell and run;

gcloud projects add-iam-policy-binding { YOUR PROJECT ID }
--member=serviceAccount:{ YOUR SERVICE ACCOUNT } \
--role=roles/datafusion.serviceAgent

Cool! Let's load the data now. Still in Cloud Shell, run:

export BUCKET={ YOUR PROJECT ID OR BUCKET NAME }
gsutil mb gs://$BUCKET
gsutil cp gs://cloud-training/OCBL017/ny-taxi-2018-sample.csv gs://$BUCKET
gsutil mb gs://$BUCKET-temp

Click the View Instance link on the Cloud Data Fusion instances page, or the details page of an instance. If prompted to take a tour of the service click on No, Thanks. You should now be in the Cloud Data Fusion UI. Note: You may need to reload or refresh the Cloud Fusion UI pages to allow prompt loading of the page.

Click on Wrangler. Wrangler is an interactive, visual tool that lets you see the effects of transformations on a small subset of your data before dispatching large, parallel-processing jobs on the entire dataset. Select Cloud Storage Default under Google Cloud Storage.

Click on the bucket you created before and select the ny-taxy sample file.

Let's clean and change a few things on this file.

  • To the left of the body column, click the Down arrow.
  • Click Parse > CSV, select Set first row as header and then click Apply. The data splits into multiple columns.
Before parse
  • Because the body column isn't needed anymore, click the Down arrow next to the body column and choose Delete column.
  • You’ll notice that all of the column types have been loaded in as String. Click the Down arrow next to the trip_distance column, select Change data type and then click on Float. Repeat for the total_amount column.
  • If you look at the data closely, you may find some anomalies, such as negative trip distances. You can avoid those negative values by filtering out in Wrangler. Click the Down arrow next to the trip_distance column and select Filter. Click if Custom condition and input >0.0

Now basic data cleansing is now complete and you’ve run transformations on a subset of your data. You can now create a batch pipeline to run transformations on all your data.

  • On the upper-right side of the Google Cloud Fusion UI, click Create a Pipeline.
  • In the dialog that appears, select Batch pipeline.

In the Data Pipelines UI, you will see a GCSFile source node connected to a Wrangler node. The Wrangler node contains all the transformations you applied in the Wrangler view captured as directive grammar. Hover over the Wrangler node and select Properties.

This is the place where you can change other things if needed. Let's delete the "extra" column. Easy, just click in the trash icon.

After this, you can just close the Wrngler UI pressing the "X" in the right corner.

The taxi data contains several cryptic columns such as pickup_location_id, that aren't immediately transparent to an analyst. You are going to add a data source to the pipeline that maps the pickup_location_id column to a relevant location name. The mapping information will be stored in a BigQuery table.

In a separate tab, open the BigQuery UI in the GCP Console. Click Done on the ‘Welcome to BigQuery in the Cloud Console’ launch page.

In the left pane, in the Resources section, click your GCP Project ID and select Create dataset.

In the Dataset ID field type in trips. and then Create dataset.

To create the desired table in the newly created dataset, navigate to More > Query Settings. This process will ensure you can access your table from Cloud Data Fusion.

Select the item for Set a destination table for query results. Also, under Table nameinput zone_id_mapping. Select Save.

Run this query on BigQuery editor:

SELECT zone_id, zone_name, borough FROM `bigquery-public-data.new_york_taxi_trips.taxi_zone_geom`

You should get this result

Now, you will add a source in your pipeline to access this BigQuery table. Return to tab where you have Cloud Data Fusion open, from the Plugin palette on the left, select BigQuery from the Source section. A BigQuery source node appears on the canvas with the two other nodes.

Click on the new BigQuery source node and click Properties.

To configure the Reference Name, enter zone_mapping, which is used to identify this data source for lineage purposes. The BigQuery Dataset and Table configurations are the Dataset and Table you setup in BigQuery a few steps earlier: trips and zone_id_mapping. For Temporary Bucket Name input the name of your project followed by "-temp", which corresponds to the bucket you created in Task 2.

To populate the schema of this table from BigQuery, click Get Schema. The fields will appear on the right side of the wizard.

Click on X to return to Data Fusion pipeline page.

Now it's time to join the two data sources — taxi trip data and zone names.

Under the Analytics section in the Plugin Palette, choose Joiner. A Joiner node appears on the canvas.

To connect the Wrangler node and the BigQuery node to the Joiner node: Drag a connection arrow > on the right edge of the source node and drop on the destination node. Easy, no?

Let's configure the Joiner node:

  • Click Properties of Joiner.
  • Leave the label as Joiner.
  • Change the Join Type to Inner
  • Set the Join Condition to join the pickup_location_id column in the Wrangler node to the zone_id column in the BigQuery node.

In the Output Schema table on the right, remove the zone_id and pickup_location_id fields by hitting the red garbage can icon. Back to the Data Fusion pipeline page

Let's store the result to a sink.

In the Sink section of the Plugin Palette, choose BigQuery.

Connect the Joiner node to the BigQuery node.
Drag a connection arrow > on the right edge of the source node and drop on the destination node.

Open the BigQuery node by clicking on it and then selecting Properties. You will next configure the node as shown below. You will use a configuration that’s similar to the existing BigQuery source. Provide bq_insert for the Reference Name field and then use trips for the dataset and the name of your project followed by "-temp" as Temporary Bucket Name. You will write to a new table that will be created for this pipeline execution. In table field, enter trips_pickup_name.

Great! Let's now deploy and run the pipeline!!!

Name your pipeline in the upper left corner of the Data Fusion UI and click OK

Now you wlll deploy the pipeline. In the upper-right corner of the page, click Deploy.

Things are now ready, let's run!!!

When you run a pipeline, Cloud Data Fusion provisions an ephemeral Cloud Dataproc cluster, runs the pipeline, and then tears down the cluster. This could take a few minutes. You can observe the status of the pipeline transition from Provisioning to Starting and from Starting to Running to Succeeded during this time.

You can track the inputs and outputs in UI

GREAT! You have your pipeline working!! Let's check on BigQuery the results! Return to the tab where you have BigQuery open. Run the query below to see the values in the trips_pickup_name table.

SELECT * FROM `trips.trips_pickup_name`

That's it, guys! Most of the job here is understand the business. After this, you just need to join like LEGO.

Stay safe!! :)

--

--