An Introduction to Hadoop in EMR AWS.
Big Data being an integral part of Machine Learning, here we are going to process Freddie Mac Monthly Loan dataset stored in S3 using EMR. Hive is used to process the data and store the processed data in S3 for further machine learning process. Sample dataset is having nearly 1 billion records. Below is an graphical representation of the activities to be performed.
Contents:
- Launch EMR Cluster and Understand file systems.
- Transfer data between PC, S3, EMR Local and hdfs.
- Create Table in Hive, Pre-process and Load data to hive table.
- Data Analysis and Optimization.
- Clone an existing table.
- Complex and String Data Types.
- Working with Views.
- Bootstrap during launch of EMR cluster.
- Data transfer between HDFS and RDBMS.
Tools/Software Used:
- Storage — AWS S3.
- Processing — AWS EMR.
- Languages — FS Shell and HQL.
1. Launch EMR Cluster and Understand file systems :
Please follow the link to launch an EMR cluster and Connect to EMR using Git Bash(To be installed previously in local PC) and .pem file(Downloaded from AWS during user creation). Please enable Port 22 of EMR for access.
Once EMR is launched and connected, below is the screen for further playing with EMR.
In EMR we have both Local and Hadoop file systems. Below is the image showing default and newly created folders in Local and HDFS Folders.
From above screen-prints, EMR local folder is /home/hadoop/. As we can see the test folder freddie-local was created in default folder /home/hadoop/.
From above, EMR default HDFS folder is /user/hadoop/ as the test folder freddie-hdfs was created in location /user/hadoop/. Default hive folder is /user/hive/warehouse/. All the tables created in hive will store data in default folder /user/hive/warehouse/ unless different LOCATION parameter is specified during table creation step. To access EMR Local, use only linux cli commands while to access EMR HDFS we need to add “hadoop fs” and “-” as shown above.
In AWS, “hive” command is used in EMR to launch Hive CLI as shown. Also we can work with Hive using Hue. Please follow the link to launch Hue and access Hive.
We are now all set to play with Hive using CLI and Hue. In this story, we will use Hive CLI for interacting with HDFS.
2. Transfer data between PC, S3, EMR Local and hdfs:
a. PC to EMR Local — Data can be transferred between PC and EMR Local using the commands as below. For connecting local PC to EMR cluster, .pem file is required which should be created during user creation. Here we will transfer a test file freddie-mac.txt from desktop to EMR local.
b. S3 to EMR local — “aws s3 cp” and “curl” can be used to move data between S3 and EMR local as below. Here we will transfer file freddie-mac.txt from S3 bucket freddie-mac-hgsh to EMR local folder i.e /home/hadoop/.
We can also use “hadoop fs -get” to pull and “hadoop fs -put” to push data between S3 and EMR local as below.
Curl can also be used to download data from S3 to EMR local as below.
c. EMR Local to hdfs — “hadoop fs -put” and “hadoop fs -get” can be used to move data between EMR Local and hdfs as below.
d. S3 to EMR Hdfs — “hadoop fs -cp” can be used to move data between S3 and EMR hdfs as below.
e. Browse files in S3 and Hdfs — “hadoop fs -cat” can be used to browse data in S3 and EMR Hdfs as below. Here head along with “|” character is used to limit the number of rows.
Data was copied from EMR local to hdfs and browsed as shown below.
3. Create Table in Hive, Pre-process and Load data to hive table:
In hive we can create external and internal tables. For external tables, data is not deleted when a table is deleted. Its only schema which is deleted. For internal tables, both data and schema are deleted when a table is deleted.
When a table is created in hive, /user/hive/warehouse/ is default location where table data is stored. But we can also specify different location for table data to be stored or referenced while creating a table. The data location can be any folder in EMR HDFS /user/* or S3.
In current project, we have raw data in parquet format stored at S3. We need to process and store final data in S3, for further machine learning process. So we do not need to store table data into hdfs and also our data should be retained even after we terminate our EMR cluster. Hence will create an external table and use location as S3 bucket.
The respective schema was created using freddie mac reference document (Monthly Performance Data File). Have collected the raw data file and stored in S3. The last three fields step modification flag, deferred payment modification and estimated loan to value (ELTV) are excluded from the raw input file.
Please refer below screen prints for creating external table using raw data stored in S3.
Please refer below screen-prints for creating external table to store processed data in S3 and load data into the table.
As the table location was provided as one of the S3 bucket, o/p was written to S3 bucket instead of default location /user/hive/warehouse/ as below.
Once the above processing is done, will terminate the EMR cluster and will launch a new one later for further analysis and processing.
4. Data Analysis and Optimization:
A new EMR cluster was launched as described previously. Two separate GIT bash CLI were used, one to connect with hadoop and the other to hive. Once connected to hive, will create a new database and a table using the processed data created earlier in S3.
a. Data is having records from 2000 to 2017 as shown below.
b. Selecting all the monthly performance data for Loan ID F107Q1287729 in year 2007.
c. For all the above queries, data is fetched from S3 as the specified table data location is S3 bucket. Instead of using S3, we can also move the data into hive by creating another table as show below for further processing.
d. Find count of loan id with current loan delinquent status = 0(Delinquent less than 30 days) on month of September 2010.
e. One way of optimizing hive queries is creating Partitioned Tables. Table Partitions are created based on the common type of queries being performed over the table. If this is not done carefully, may be more costly. Here we assume that our data is mostly queried based on Date or Year. Hence creating a partitioned table for optimization. This is dynamic partitioning, as partitions are created from certain data field for each row.
After visiting YARN, found out like there was no container allocated to job.
Due to limited memory in my EMR cluster, deleted table loan_monthly and created a new External Table loan_monthly with data location as S3 bucket. Used the external table to load data into partitioned table loan_by_yearly. Along with the link, below are few screen prints depicting YARN and Cluster Nodes while data was loaded.
As we allocated only two core nodes while creating EMR cluster, only two of them were used while loading data into partitioned table.
Sub folders are created based on the value of period_year based on which partitioning was done.
The parameter by which table is partitioned appears as a column when table is viewed or used, but is not actually added to data file.
After executing the same query as before, it costed only 52 secs instead of 575 secs which is nearly 10 times efficient in this case.
Data is directly copied from EMR hdfs to S3 , to be used later while EMR is launched next time.
Later while a new EMR cluster is launched , a new partitioned table is created using data in S3 location as below. This is called static partitioning.
The total count of records in restored partitioned table is equal to the record count in processed table which is equal to 1064685806 as shown in section 3.
Also from above, total time taken for count in partitioned and not partitioned table is nearly same, as partitioning is done based on year and our count query is not year specific.
5. Clone an existing table :
LIKE can be used to catalogue or clone an existing table. Here only schema is created and there is no data transfer. Data needs to be loaded explicitly to newly cloned table. IF NOT EXISTS is used to avoid error while creating new database or table if any older database or table exists with same name.
6. Complex and String Data Types:
Hive supports complex data types like Array, Map and Struct. Twitter data which was collected using Kinesis Firehose has been stored in S3. This data is used to demonstrate Create tables, Load and Query complex data.
Create an temporary table in hive to access raw twitter data. Data can also be loaded into hive table from S3 as shown below. Using LOAD command, moves(not copy) the data from source to target location.
Create complex data in a temp table by processing string data as below. If different complex data types are used in same table, all of them must use the same terminator. Here “|” is used as terminator in array, map and struct.
Create a table with complex data types and copy processed data to complex table data folder. COLLECTION ITEMS TERMINATED BY ‘|’ is used in table definition as data was created accordingly in previous step.
Complex data is accessed from hive tables as shown below.
Explode function can also be used to access only Array and Map as shown below.
When using explode function along with other columns, Lateral View needs to be used as below. Though the words la and lm are not used anywhere, but needs to be defined with Lateral View.
7. Working with Views :
A view is an saved query which can be queried similarly as a table. If a complex query is used again and again, it can be saved as a view to save time. View is visible as table in database.
Non materialized view are created by default in hive, which does not save underlying query data. Operations of view are merged with operations in our external query onto single set for optimization by hive before execution. But in materialized view, query data is saved to database making them faster. Hence with change of underlying data, non materialized view is updated automatically while materialized view needs to be rebuild.
Order By being an expensive operation, should never be used inside view. If required, sorting should always be included in action performed on view.
Create table dayofweek from data present in S3.
Non materialized and materialized view were created using the processed and day of week data as shown below. View creation time is quick for non materialized view while query time is quick for materialized.
Both the views are visible as tables in database, but data folder is only created for materialized view.
View can also be modified similarly as tables depicted below.
8. Bootstrap during launch of EMR cluster:
Hive queries can be saved to S3 and the same script location can be used in bootstrap action during launch of EMR cluster. This will execute all the respective scripts along with launch of cluster automatically.
9. Data transfer between HDFS and RDBMS:
sqoop executed in Linux is used for data transfer between HDFS and RDBMS both for import and export.
More details to be updated. Your thoughts will be appreciated.
Thanks, Abhi.