An Introduction to Hadoop in EMR AWS.

Abhilash Mohapatra
Analytics Vidhya
Published in
13 min readAug 1, 2020

Big Data being an integral part of Machine Learning, here we are going to process Freddie Mac Monthly Loan dataset stored in S3 using EMR. Hive is used to process the data and store the processed data in S3 for further machine learning process. Sample dataset is having nearly 1 billion records. Below is an graphical representation of the activities to be performed.

Contents:

  1. Launch EMR Cluster and Understand file systems.
  2. Transfer data between PC, S3, EMR Local and hdfs.
  3. Create Table in Hive, Pre-process and Load data to hive table.
  4. Data Analysis and Optimization.
  5. Clone an existing table.
  6. Complex and String Data Types.
  7. Working with Views.
  8. Bootstrap during launch of EMR cluster.
  9. Data transfer between HDFS and RDBMS.

Tools/Software Used:

  1. Storage — AWS S3.
  2. Processing — AWS EMR.
  3. Languages — FS Shell and HQL.

1. Launch EMR Cluster and Understand file systems :

Please follow the link to launch an EMR cluster and Connect to EMR using Git Bash(To be installed previously in local PC) and .pem file(Downloaded from AWS during user creation). Please enable Port 22 of EMR for access.

Once EMR is launched and connected, below is the screen for further playing with EMR.

EMR CLI

In EMR we have both Local and Hadoop file systems. Below is the image showing default and newly created folders in Local and HDFS Folders.

EMR Local

From above screen-prints, EMR local folder is /home/hadoop/. As we can see the test folder freddie-local was created in default folder /home/hadoop/.

EMR HDFS

From above, EMR default HDFS folder is /user/hadoop/ as the test folder freddie-hdfs was created in location /user/hadoop/. Default hive folder is /user/hive/warehouse/. All the tables created in hive will store data in default folder /user/hive/warehouse/ unless different LOCATION parameter is specified during table creation step. To access EMR Local, use only linux cli commands while to access EMR HDFS we need to add “hadoop fs” and “-” as shown above.

In AWS, “hive” command is used in EMR to launch Hive CLI as shown. Also we can work with Hive using Hue. Please follow the link to launch Hue and access Hive.

EMR Hive CLI

We are now all set to play with Hive using CLI and Hue. In this story, we will use Hive CLI for interacting with HDFS.

2. Transfer data between PC, S3, EMR Local and hdfs:

a. PC to EMR Local — Data can be transferred between PC and EMR Local using the commands as below. For connecting local PC to EMR cluster, .pem file is required which should be created during user creation. Here we will transfer a test file freddie-mac.txt from desktop to EMR local.

Desktop to EMR Local

b. S3 to EMR local “aws s3 cp” and “curl” can be used to move data between S3 and EMR local as below. Here we will transfer file freddie-mac.txt from S3 bucket freddie-mac-hgsh to EMR local folder i.e /home/hadoop/.

S3 to EMR Local — 1

We can also use “hadoop fs -get” to pull and “hadoop fs -put” to push data between S3 and EMR local as below.

S3 to EMR Local — 2

Curl can also be used to download data from S3 to EMR local as below.

S3 to EMR Local— 3

c. EMR Local to hdfs — hadoop fs -put” and “hadoop fs -get” can be used to move data between EMR Local and hdfs as below.

EMR local to hdfs.

d. S3 to EMR Hdfs — hadoop fs -cp” can be used to move data between S3 and EMR hdfs as below.

EMR hdfs to S3.

e. Browse files in S3 and Hdfs — “hadoop fs -cat” can be used to browse data in S3 and EMR Hdfs as below. Here head along with “|” character is used to limit the number of rows.

Browse S3 data.

Data was copied from EMR local to hdfs and browsed as shown below.

Browse Hdfs data.

3. Create Table in Hive, Pre-process and Load data to hive table:

In hive we can create external and internal tables. For external tables, data is not deleted when a table is deleted. Its only schema which is deleted. For internal tables, both data and schema are deleted when a table is deleted.

When a table is created in hive, /user/hive/warehouse/ is default location where table data is stored. But we can also specify different location for table data to be stored or referenced while creating a table. The data location can be any folder in EMR HDFS /user/* or S3.

In current project, we have raw data in parquet format stored at S3. We need to process and store final data in S3, for further machine learning process. So we do not need to store table data into hdfs and also our data should be retained even after we terminate our EMR cluster. Hence will create an external table and use location as S3 bucket.

The respective schema was created using freddie mac reference document (Monthly Performance Data File). Have collected the raw data file and stored in S3. The last three fields step modification flag, deferred payment modification and estimated loan to value (ELTV) are excluded from the raw input file.

Please refer below screen prints for creating external table using raw data stored in S3.

Size of raw parquet file in S3.
Empty hive warehouse folder before new database creation.
Create database freddie.
Hive warehouse folder after database creation.
Create table with raw data.
Raw file record count.

Please refer below screen-prints for creating external table to store processed data in S3 and load data into the table.

Create processed table to store processed data in S3.
Load data to processed table.
Processed file record count.
Raw and Processed file.
Enable header and view processed file.

As the table location was provided as one of the S3 bucket, o/p was written to S3 bucket instead of default location /user/hive/warehouse/ as below.

Processed data created in o/p S3 bucket.

Once the above processing is done, will terminate the EMR cluster and will launch a new one later for further analysis and processing.

4. Data Analysis and Optimization:

A new EMR cluster was launched as described previously. Two separate GIT bash CLI were used, one to connect with hadoop and the other to hive. Once connected to hive, will create a new database and a table using the processed data created earlier in S3.

New cluster launched.
Create new database and table using processed S3 data.

a. Data is having records from 2000 to 2017 as shown below.

No of distinct years in our data.

b. Selecting all the monthly performance data for Loan ID F107Q1287729 in year 2007.

Monthly performance data of loan F107Q1287729 in year 2007.

c. For all the above queries, data is fetched from S3 as the specified table data location is S3 bucket. Instead of using S3, we can also move the data into hive by creating another table as show below for further processing.

New table is created in hdfs with default location.
Describe formatted — pg1
Describe formatted — pg2
New folder with name loan_monthly created in default hive data location.

d. Find count of loan id with current loan delinquent status = 0(Delinquent less than 30 days) on month of September 2010.

count distinct with certain conditions in particular year.

e. One way of optimizing hive queries is creating Partitioned Tables. Table Partitions are created based on the common type of queries being performed over the table. If this is not done carefully, may be more costly. Here we assume that our data is mostly queried based on Date or Year. Hence creating a partitioned table for optimization. This is dynamic partitioning, as partitions are created from certain data field for each row.

Default hive warehouse folder before creation of partitioned table.
Create partitioned table partitioned by year.
Hive warehouse after table creation.
Partitioned column period_year appears as a column when we use describe.
Load query stuck, mostly due to memory error.

After visiting YARN, found out like there was no container allocated to job.

YARN Application Error.
HDFS memory report pg-1
HDFS memory report pg-2
Actual File Size in S3 vs File Size in HDFS

Due to limited memory in my EMR cluster, deleted table loan_monthly and created a new External Table loan_monthly with data location as S3 bucket. Used the external table to load data into partitioned table loan_by_yearly. Along with the link, below are few screen prints depicting YARN and Cluster Nodes while data was loaded.

Newly created External Table loan_monthly.
Loading data into partitioned table.
Total 8 containers are allocated. 7 used for task and 1 used for switch.

As we allocated only two core nodes while creating EMR cluster, only two of them were used while loading data into partitioned table.

EMR Cluster Nodes Matrix during data loading into partitioned table.
Data Loading completed into partitioned table.

Sub folders are created based on the value of period_year based on which partitioning was done.

Folders created in hive warehouse according to year.

The parameter by which table is partitioned appears as a column when table is viewed or used, but is not actually added to data file.

10 rows from partitioned table.

After executing the same query as before, it costed only 52 secs instead of 575 secs which is nearly 10 times efficient in this case.

count distinct with certain conditions in particular year.

Data is directly copied from EMR hdfs to S3 , to be used later while EMR is launched next time.

Move file between EMR hdfs and S3.

Later while a new EMR cluster is launched , a new partitioned table is created using data in S3 location as below. This is called static partitioning.

Partitioned data in S3.
Create partitioned table.
Add partitioned data to table pg-1.
Add partitioned data to table pg-2.

The total count of records in restored partitioned table is equal to the record count in processed table which is equal to 1064685806 as shown in section 3.

Record count of restored partitioned table.

Also from above, total time taken for count in partitioned and not partitioned table is nearly same, as partitioning is done based on year and our count query is not year specific.

Few records from restored partitioned table. The table is having the partitioned field period_year.

5. Clone an existing table :

LIKE can be used to catalogue or clone an existing table. Here only schema is created and there is no data transfer. Data needs to be loaded explicitly to newly cloned table. IF NOT EXISTS is used to avoid error while creating new database or table if any older database or table exists with same name.

Hive warehouse before creation of table.
Table catalogued using LIKE.
New empty folder is created in hive warehouse.

6. Complex and String Data Types:

Hive supports complex data types like Array, Map and Struct. Twitter data which was collected using Kinesis Firehose has been stored in S3. This data is used to demonstrate Create tables, Load and Query complex data.

Create an temporary table in hive to access raw twitter data. Data can also be loaded into hive table from S3 as shown below. Using LOAD command, moves(not copy) the data from source to target location.

Create table and load data from S3.

Create complex data in a temp table by processing string data as below. If different complex data types are used in same table, all of them must use the same terminator. Here “|” is used as terminator in array, map and struct.

Load data into temp table to create complex data.
View created complex data.

Create a table with complex data types and copy processed data to complex table data folder. COLLECTION ITEMS TERMINATED BY ‘|’ is used in table definition as data was created accordingly in previous step.

Create a table with complex data type.
Describe table with complex data types.
Move created data to complex table data folder.

Complex data is accessed from hive tables as shown below.

Access complex data in hive tables.

Explode function can also be used to access only Array and Map as shown below.

using only explode function to expand Array and Map.

When using explode function along with other columns, Lateral View needs to be used as below. Though the words la and lm are not used anywhere, but needs to be defined with Lateral View.

Explode with Lateral View only for Array and Map.

7. Working with Views :

A view is an saved query which can be queried similarly as a table. If a complex query is used again and again, it can be saved as a view to save time. View is visible as table in database.

Non materialized view are created by default in hive, which does not save underlying query data. Operations of view are merged with operations in our external query onto single set for optimization by hive before execution. But in materialized view, query data is saved to database making them faster. Hence with change of underlying data, non materialized view is updated automatically while materialized view needs to be rebuild.

Order By being an expensive operation, should never be used inside view. If required, sorting should always be included in action performed on view.

Create table dayofweek from data present in S3.

Create table for day of week.
Load data into hive table from source.Here S3 is used as source.
View data of table dayofweek.

Non materialized and materialized view were created using the processed and day of week data as shown below. View creation time is quick for non materialized view while query time is quick for materialized.

Creating and querying non materialized view.
Creating and querying materialized view.

Both the views are visible as tables in database, but data folder is only created for materialized view.

Views visible as tables in database.
Data only created for materialized view.

View can also be modified similarly as tables depicted below.

Describe Formatted of original view.
Alter underlying view query.
Describe Formatted of modified view.
Dropping views.

8. Bootstrap during launch of EMR cluster:

Hive queries can be saved to S3 and the same script location can be used in bootstrap action during launch of EMR cluster. This will execute all the respective scripts along with launch of cluster automatically.

9. Data transfer between HDFS and RDBMS:

sqoop executed in Linux is used for data transfer between HDFS and RDBMS both for import and export.

More details to be updated. Your thoughts will be appreciated.

Thanks, Abhi.

--

--