Data Engineering: Data transformation using AWS cloud formation, Apache Airflow, Apache Livy and Pyspark.

shwetha kamath
7 min readFeb 15, 2020

--

As part of my Udacity nano degree program, I learned the latest open-source data engineering technologies to transform large and complex datasets. In this article, I am going to describe the project I completed by using these technologies. I am using US immigration data along with global temperature, US airports dataset and US city demographic data. We will use AWS EMR, Apache airflow, Apache Livy and Pyspark.

When we are dealing with multiple large datasets, we come across questions like where is the data coming from? How often data need to be updated? Which requires a deep understanding of the data. Once we decide on the data model. We have to clean up raw data, filter the data, combine the data and perform aggregation of data. Data might also need some transformations to fit our needs. Finally, we have to check for data quality. These tasks need to be managed and triggered based on the dependencies. On top of these steps, we also need to address the scalability issues. In this article lets find out how we can design a scalable system that can address all the above-listed issues.

Firstly AWS EMR and Apache Spark are best suited to build large scale data pipelines. Apache Spark is well known for fast data processing using in-memory computations. In this project, I am using Pyspark API. I will be showing you how to build data lakes using Apache Spark with Pyspark API.

To orchestrate the data pipeline in this project I am using Apache Airflow. Apache airflow is a very easy tool to set up, with few commands, you can have a workflow management tool up and running in no time. It has a powerful user interface and can be effectively extended and modeled using python programming.

We will be using Apache Livy to submit the spark jobs to EMR using a REST interface.

We use an AWS CloudFormation script to launch the AWS services required to create this workflow. CloudFormation is a powerful service that allows you to describe and provision all the infrastructure and resources required for your cloud environment, in simple JSON or YAML templates. You can refer to the Amazon CloudFormation article which goes into details of setting up the cloud. The cloud formation script will create an EC2 instance for Apache Airflow. It will also create IAM roles, S3, and set up a Postgres DB instance for airflow. See the picture below.

Aws cloud formation

The cloud formation script will set up the apache airflow. We will be using the LocalExecutor for the scheduler, hence using Postgres DB. The default scheduler for airflow is SequentialExecutor.

Before we talk about setting up the stack and other steps. Let’s talk about Data itself.

Data Set

The following data set is combined and transformed to the fact and dimensional data and stored on S3.

  • I94 Immigration Data: This data comes from the US National Tourism and Trade Office. This is where the data comes from. Udacity provided the dataset for the year 2016 and along with a data dictionary. I have uploaded the sample data file to the GitHub.
  • World Temperature Data: This dataset came from Kaggle. You can read more about it here.
  • U.S. City Demographic Data: This data comes from OpenSoft. You can read more about it here.
  • Airport Code Table: This is a simple table of airport codes and corresponding cities. It comes from here.

Now let’s focus on setting up the cloud infrastructure.

Steps for creating the stack:

On AWS go-to cloud formation service.

Cloudformation service.

Create stack->upload the template in my case its cloudformation.yaml. Provide a DB password, select the key-value pair. If you do not have a key-value pair please create one. Give the name for the S3 bucket. See the steps below.

Create stack choose the template
give details required for the stack creation script

Click next -> next-> then select the “I acknowledge that AWS CloudFormation might create IAM resources with custom names.

It will take some time to create the stack.

Making raw data available.

The above listed raw data sets are moved to S3 by a python script using boto3.

import boto3
import configparser
import os
config = configparser.ConfigParser()
config.read('dl.cfg')
os.environ["AWS_ACCESS_KEY_ID"] = config.get("AWS", "AWS_ACCESS_KEY_ID")
os.environ["AWS_SECRET_ACCESS_KEY"] = config.get("AWS", "AWS_SECRET_ACCESS_KEY")
s3_client = boto3.client('s3')S3_BUCKET = 'bucketname'
KEY_PATH = "data"
def uploadToS3(f_name,s3_f_name):
try:
s3_client.upload_file(f_name, S3_BUCKET, KEY_PATH + s3_f_name)
except Exception as e:
print(e)
def uploadImmigrationDataS3():
root = "../../data/18-83510-I94-Data-2016/"
files = [root + f for f in os.listdir(root)]
for f in files:
uploadToS3(f,"/raw/i94_immigration_data/18-83510-I94-Data-2016/" + f.split("/")[-1])
def uploadDemographics():
uploadToS3("us-cities-demographics.csv","/raw/demographics/us-cities-demographics.csv")
def uploadGlobalTemperatures():
root = "../../data2/"
files = [root + f for f in os.listdir(root)]
for f in files:
uploadToS3(f,"/raw/globaltemperatures/" + f.split("/")[-1])
def uploadAirportCode():
uploadToS3("airport-codes_csv.csv", "/raw/airportcode/airport-codes_csv.csv")
def uploadCodes():
uploadToS3("i94addrl.txt", "/raw/codes/i94addrl.txt")
uploadToS3("i94cntyl.txt", "/raw/codes/i94cntyl.txt")
uploadToS3("i94prtl.txt", "/raw/codes/i94prtl.txt")
uploadToS3("i94model.txt", "/raw/codes/i94model.txt")
uploadToS3("i94visa.txt", "/raw/codes/i94visa.txt")
uploadImmigrationDataS3()
uploadDemographics()
uploadGlobalTemperatures()
uploadAirportCode()
uploadCodes()

EMR setup:

The Airflow is up and running in the newly created ec2 instance by our cloud formation script. The script takes care of installing the airflow on the instance and copying the project from the git repository. Start the airflow scheduler by logging into the ec2 instance. If you log into the EC2 instance you can check the /var/logs/user_data.log in case you need to debug the cloud formation script.

Using ssh connect to the EC2 machine.

$ssh -i ~/amazon_credentials/spark-EMR-spark-cluster-keypair/spark-cluster.pem ec2-user@ec2–52–37–128–107.us-west-2.compute.amazonaws.com

Start the airflow scheduler: You can do this by “su” and then execute the ~/.bash_profile and running

$airflow scheduler

Now copy the public DNS name from ec2 instance. Open the browser and connect to the airflow on port 8080.

We have our dag immigration_dag scheduled to run every month. It backfills the data from 01/01/2016. See the dag configuration:

default_args = {
'owner': 'decapstone-immigration',
'start_date': datetime(2016,1,1,0,0,0,0),
'end_date':datetime(2016,4,1,0,0,0,0),
'depends_on_past':False,
'retries':1,
'retry_delay':timedelta(minutes=5),
'email_on_retry':False,
'provide_context': True
}
#Initializing the Dag, to transform the data from the S3 using spark and create normalized datasets
dag = DAG('immigration_etl_dag',
default_args=default_args,
concurrency=3,
catchup=True,
description='Load and transform data for immigration project',
max_active_runs=1,
schedule_interval="@monthly"
)

Here is the workflow that creates EMR instances performs data transformations, checks for data quality, and finally terminates the clusters.

Since create clusters might take some time. We need to wait for the cluster to be available to start with the data transformations. Hence we have created a Sensor operator. That pokes and waits till the Cluster status become “Waiting”.

Once the cluster is available. We start with the transformations. This is done by submitting the spark jobs to the EMR using Apache Livy which allows us to submit the job as a REST web service call. Once all transformations are done we will check for data quality. You can do this by checking for the data and also by checking the S3 where data is stored.

Finally, we need to terminate the cluster. This has to be done irrespective of whether the transformation was successful or not. To do this I am setting the trigger rule as “all done” by default it is “all success”.

terminate_cluster = TerminateEMRClusterOperator(
task_id="terminate_cluster",
dag=dag,
trigger_rule="all_done",
emr_connection=emr_conn
)

You can refer to my Github to look at the source code and project documentation.

Summary:

In this article, we went through the steps for orchestrating a spark data pipeline using the AWS EMR, Apache Livy and Apache Airflow.

We can do other improvements for this project like here we are performing all the steps in a single dag. But practically the airport codes, demographics data may not change as often so can be separated into a different dag. Also, the transformation jobs wait for the cluster availability using a Base sensor. We can change it to use triggers.

The weather data we pulled in had data only till 2013. Hence I did not join immigration with weather data. It would be interesting to see the travel and the effect of weather.

You could refer to my Github repository to see source code or view project documentation. You can connect to me on my LinkedIn profile.

--

--