Join, Group By and Aggregate in Cloud Data Fusion

Stéphane Fréchette
Google Cloud - Community
9 min readNov 24, 2019

Good news! Cloud Data Fusion is now GA. Announced at Google Next ’19 UK on November 21, 2019 Cloud Data Fusion is a fully managed, cloud-native, enterprise data integration service for quickly building and managing data pipelines. Cloud Data Fusion web UI allows you to build scalable data integration solutions to clean, prepare, blend, transfer, and transform data, without having to manage the infrastructure. Cloud Data Fusion is powered by the open source project CDAP.

How to get started with Cloud Data Fusion? This post shows you how to simply build and use the Wrangler and Data Pipelines features in Cloud Data Fusion to clean, transform and process flight data.

ETL Process
The diagram shows the transformations which are going to take place. This will be to read the two files, transform the data and loading it into one output; Total Flights per Airline.

Objectives

  • Connect Cloud Data Fusion to data sources
  • Apply basic transformations
  • Join and Group By data sources
  • Write to a sink

You are ready to begin!

Log on to the GCP Console

First go to the GCP console and log in using your Google account.

Select or Create a GCP project

You need to select a project. If you don’t have any projects then go to the project selector page to create one — For this exercise I have created and using a specific project named flights-analysis. I highly recommend you create a new project for this walkthrough. Refer to Creating your project if you need assistance.

Create GCS bucket and copy data

You need data! The two small datasets are located in a GCS bucket that you will need to copy to your own bucket.

First create your bucket, you can do this by typing bucket in the resources and products search field and select Create bucket.

Once on the Create a bucket page, provide the name of your bucket (1), keep in mind that these are globally unique and for the purpose of demonstration I have inputted flights-analysis-data2. Choose Region (2) for Location type and select your desired Location (3), I have chosen northamerica-northeast1 for demonstration, then click Create (4) to create your bucket.

Activate Cloud Shell
You now need to Active Cloud Shell, In GCP console, on the top right toolbar, click the Open Cloud Shell button. You can click Continue immediately when the dialog box opens.

It takes a few moments to provision and connect to the environment. When you are connected, you are already authenticated, and the project is set to your PROJECT_ID.

The output is similar to the following:

You need to issue the following commands to copy the required sample files and structure to your GCS bucket. Replace [BUCKET_NAME] with the name of the bucket you created earlier.

gsutil cp gs://flights-analysis-sample-data/input/airlines.csv gs://[BUCKET_NAME]]/input/ gsutil cp gs://flights-analysis-sample-data/input/flights_small.csv gs://[BUCKET_NAME]/input/

The output is similar to the following:

Create a Cloud Data Fusion instance

You are now ready to create your Cloud Data Fusion instance.

  1. Go to the Cloud Data Fusion page, you can do this by typing data fusion in the resources and products search field and select Data Fusion.
  1. If the Cloud Data Fusion API is not already enabled, you will have to enable it by clicking Enable.
    This might take a while to complete.
  1. Provide an Instance name (1), select your Region (2), select Basic for Edition (3), and click Create (4) to deploy your instance. For the purpose of this exercise I have chosen flights_data-etl for the name of my instance and chosen northamerica-northeast1 as my region. You can supply your own values for these properties.
    Note: This will take several minutes to complete (be patient).

Building the data pipeline

  1. Once in the Studio canvas, you are now ready to build the data pipeline. Start by selecting or make sure you are in Source (1) view , click GCS (2) source — this will add a GCS source on the canvas, then click Properties (3) from the GCS Source to continue.
  1. The GCS Properties configuration page shows up. Assign the following value for Label (1): “GCS — Flights Data”, then input the following value for Reference Name (2): “gcs_flights_data”, provide the Path (3) to the GCS Bucket you have created earlier, where you have stored the flight_small.csv file. For the purpose of this exercise I have include mine, make sure to supply your appropriate path (if you input what is provided in this image, the validation won’t succeed). Click Validate (4) to validate all properties, you should see in green No errors found, finally click X (5) to close/save the GSC properties.
  1. The Wrangler Properties configuration page shows up. Assign the following value for Label (1): “Wrangler — Flights Data”, then click Wrangle (2) to continue.
  1. You will need to select/load data, choose the flights_small.csv file that is located in your GCS bucket you created earlier to continue.
  1. Next are a series of steps to parse and remove unwanted columns. Click the dropdown [Column transformations] on the body (1) column, select Parse -> CSV (2), select Comma (3) as the delimiter, check Set first row as header (4), then click Apply (5) to continue.
  1. You will now notice that the Recipe box as been populated with the directives you just did in the Wrangling activities. Click Validate (1) to validate all properties, you should see in green No errors found, finally click X (2) to close/save the Wrangler properties.
  1. The GCS Properties configuration page shows up. Assign the following value for Label (1): “GCS — Airlines Data”, then input the following value for Reference Name (2): “gcs_airlines_data”, provide the Path (3) to the GCS Bucket you have created earlier, where you have stored the airlines.csv file. For the purpose of this exercise I have include mine, make sure to supply your appropriate path (if you input what is provided in this image, the validation won’t succeed). Click Validate (4) to validate all properties, you should see in green No errors found, finally click X (5) to close/save the GSC properties.
  1. The Wrangler Properties configuration page shows up. Assign the following value for Label (1): “Wrangler — Airlines Data”, then click Wrangle (2) to continue.
  1. Like you did earlier you need to select/load data, choose the airlines.csv file that is located in your GCS bucket to continue.
  1. Next are a series of steps to parse and remove unwanted columns. Click the dropdown [Column transformations] on the body (1) column, select Parse -> CSV (2), select Comma (3) as the delimiter, check Set first row as header (4), then click Apply (5) to continue.
  1. Again you will now notice that the Recipe box as been populated with the directives you just did in the Wrangling activities. Click Validate (1) to validate all properties, you should see in green No errors found, finally click X (2) to close/save the Wrangler properties.
  1. The Joiner Properties configuration box shows up. In the Join — Fields section, expand the Wrangler — Airlines Data (1) and uncheck Code (2) and assign the following value “Airline_name” to the Description Alias (3), select Inner (4) for the Join Type, in the Join Condition (5) section select the value “Airline” for Wrangler — Flight Data, and “Code” for Wrangler — Airlines Data, click Get (6) for the schema, then click Validate (7) to validate all properties, you should see in green No errors found, finally click X (8) to close/save the Joiner properties.
  1. The Group By Properties configuration box shows up. In the Group by fields section (1) add the following fields: Airline, Airline_code, and Airline_name, in the Aggregates (2) section input “Departure_schedule”, select “Count” as the aggregation and input “Flight_count” as an alias, click Get Schema (3), then click Validate (4) to validate all properties, you should see in green No errors found, finally click X (5) to close/save the Group By properties.
  1. The GCS Properties configuration page shows up. Assign the following value for Label (1): “GCS — Flight Count”, then input the following value for Reference Name (2): “gcs_flight_count”, provide the Path (3) to the GCS Bucket for output you have created earlier. For the purpose of this exercise I have include mine, make sure to supply your appropriate path (if you input what is provided in this image, the validation won’t succeed), select “csv” for the Format (4) of the output file, click Validate (5) to validate all properties, you should see in green No errors found, finally click X (6) to close/save the GSC properties.
  1. The pipeline will go through different cycles; Deployed, Provisioning, Starting, Running, Deprovisioning and Succeeded indicated in the Status on the pipeline page. Once the pipeline Succeeded the next step is to go explore the output file in your GCS bucket.
  1. To validate the output sink of your pipeline, head over to your GCS bucket to the output folder and issue the following gsutil command to view the results. Make sure to replace [BUCKET_NAME] and [REPLACE_WITH_YOUR_FOLDER_DATE] with your information:
  • gsutil cat -h gs://[BUCKET_NAME]]/output/flight_count/[REPLACE_WITH_YOUR_FOLDER_DATE]/part-r-00000
  1. The output is similar to the following:

That’s it! You’ve just created and ran a complete data pipeline process on Cloud Data Fusion.

Cleanup

To avoid incurring charges to your Google Cloud Platform account for the resources used in this walkthrough:

If you want to delete the entire project, follow these instructions:

  1. In the GCP Console, go to the Manage resources page.
  2. In the project list, select the project you want to delete and click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Or if you just want to delete your Cloud Data Fusion instance follow these instructions:

  1. To view your existing Cloud Data Fusion instances, open the Instances page.
  2. To select an instance, check the box next to the instance name.
  3. To delete the instance, click Delete.

: You can also delete an instance by clicking Delete on the instance details page.

Enjoy!

Originally published at https://stephanefrechette.dev on November 24, 2019.

--

--

Stéphane Fréchette
Google Cloud - Community

Cloud Solution Architect | Databases | Data Engineering | Analytics. Drums, good food, fine wine.