An Introduction to Big Data & ML Pipeline in AWS.
This story represents an easy path for below items in AWS :
- Build an Big Data Pipeline for both Static and Streaming Data.
- Process Data in Apache Hadoop using Hive.
- Load processed data to Data Warehouse solution like Redshift and RDS like MySQL.
- Collect Streaming Data using AWS Service like Kinesis Streaming and Kinesis Firehose.
- Run Real Time Analysis on Live Streaming Data using Kinesis Analytics.
- Build and Deploy an Image Classifier ML Model with SageMaker and API.
- Build and Deploy an Real Time Web App using Flask to infer Image Classifier.
Contents:
- Static Data Collection and Pre-Processing Pipeline.
- Streaming Data Collection and Pre-Processing Pipeline.
- Build, Deploy, Monitor and Infer ML model.
Tools/Software Used:
- Services— EC2, EMR, Hive, Kinesis Data Stream, FireHose & Analytics, Lambda, SageMaker, API Gateway, CloudWatch, Jupyter Notebook.
- Storage — S3, Hdfs, RDS and Redshift.
- Languages — FS Shell, HQL, SQL and Python.
Static data is collected from Freddie Mac Single Family Loan Level dataset having more than one billion records of size greater than 80 GB. EMR and Hive is used to collect and pre-process data. Processed data is then loaded to S3 for Machine Learning. Along with S3, Processed data is also loaded to SQL and Redshift from where it can be used to build Reports and Dashboards.
Streaming data is collected from a live data generator. Live data is consumed using Kinesis Data Stream. Kinesis Data Analytics is used for Real Time data analysis and Transformation using SQL. Lambda is used in next step for data transformation and FireHose to write final data to S3. Similar pipeline can be used for Server Log analysis or Twitter analysis.
As part of ML , an Image Classifier is Trained, Built and Deployed to classify an image in real time using AWS SageMaker. A front end web app is also developed for real time inference from outside AWS using Flask.
1. Static Data Collection and Pre-Processing :
As dealing with 80 GB of raw data, EMR and Hive is used for pre-processing. Large amount of data can also be pre-processed using Spark in EMR, being much faster than Hive. This is because, Spark uses Memory instead of Disk for intermediate processing. The pre-processing steps can be modified as per specific Machine Learning requirement.
Below steps are followed for collecting and pre-processing static data :
Step — 1: Web Scrapped Freddie Mac data website to find the exact location of each of the quarterly file from 1999 to 2017. All the files were downloaded and extracted to local PC. It took around 60 mins and the size was 81 GB. Instead of EC2 or EMR, PC was used to save few bucks. An account needs to be created with Freddie Mac for downloading data.
Step — 2: The raw .txt files were then loaded to S3 using AWS CLI commands from local PC. It took around 60 mins to transfer raw data to S3.
Step — 3: External tables for both Raw and Processed files is created in Hive using S3 as file location. External tables were created to retain data even after table deletion. Location parameter is used, as raw data is already in S3 and processed data is required to move to S3 for machine learning.
Step — 4: On acquisition data, pre-processing like Date conversion is performed. It took around 140 seconds for loading 25 million records to processed acquisition table with S3 as data location. Size of raw and processed acquisition files is 3.7 and 3.8 GB respectively.
Acquisition data is also loaded into a partitioned table on yearly basis as an optimization technique for quick Hive queries.
On performance data, pre-processing like Date conversion and Ordering is performed. It took around 3.5 hrs to load 1.3 billion records to processed performance table. Size of raw and processed acquisition files is 78 and 140 GB. Other pre-processing steps can be used for specific use cases.
Step — 5: MySQL table was created in AWS RDS and data was loaded from S3 using Sqoop in EMR Linux. It around 723 secs to transfer 3.7 GB of processed acquisition data having 25 million rows. Few errors occurred due to spaces present both as line terminator and field values. Loading was successful post data formatting.
Step — 6: Redshift Table was created and processed data was pulled into Redshift from S3 using Copy command. It took around 3 hrs to pull 140 GB of performance processed data. Unload can be used for vice-versa scenarios.
Languages used are Python, AWS CLI, HQL, SQL, Sqoop. Git Link for Static Data.
2. Streaming Data Collection and Pre-Processing :
Dummy Data is collected from random user stream URL. AWS Kinesis Family is used to collect and pre-process stream data. Similar pipeline can be used to consume large amount of live data for any specific use case. Here data is stored to S3 for further ML process, but can also route processed data to RDS, Redshift, DynamoDB & EMR from Firehose. Firehose can also directly consume stream data to avoid any latency caused by Data Stream and Analytics with the cost of not storing stream data into disk.
Step — 1: A Data-Stream is created using AWS Kinesis Console. No of shards used is one as here streaming data is less than 1 MB/sec. Data-Stream uses shards to collect and transfer data.
Step — 2: A python script executed in local PC with AWS credentials, is reading data from Live Stream and writes to Data-Stream. Once the script is triggered successfully, Kinesis Data-Stream will be receiving records which is validated from Data-Stream Monitoring Console. Put_Record is used along with data Records and Partition Key to load data into Data-Stream in the python script.
Step — 3: Kinesis Analytics is used to pull Data from Kinesis Data-Stream, Execute real time Queries using SQL and Transform data as required. Analytics is able to determine the schema of input records automatically. If not, own schema can also be defined.
Post transformation, Kinesis Analytics is used to creates two different streams of data. One for SQL query Transformed data and one for Errors. These two different streams are delivered to different Firehose Delivery-Streams respectively for further route.
Step — 4: Once data is pushed to Firehose Delivery-Stream, a Lambda function(Python Script) is invoked to perform further data transformation on successful data from SQL transformation. In this case, a new line character is added after each record. Post transformation, Lambda function gives back transformed records to Firehose for further transfer. Failure records from previous step are directly written to S3 for later processing.
Once certain buffer size is reached that is defined during Delivery Stream setup, Delivery-Stream loads processed data to S3. Any transformation failure records by Lambda function is also written to S3 by Delivery Stream.
Once all the setup is done correctly, transformed data reaches S3. If any failure records, both during SQL and another during Lambda transformation are also written to S3 for further processing.
Languages used are Python and SQL. All Kinesis Family setup is done using AWS Console. Git Link for Streaming Data.
3. Built, Deploy, Monitoring and Infer ML model :
As part of Machine Learning, an Image Classifier is built to classify if an image is Penguin or Not Penguin. Final model is deployed and inferred in real time from an web application.
Step — 1. Raw Image data is collected from Google and saved in S3. SageMaker Jupyter Notebook is used to perform analysis and pre-processing like resizing, formatting, synthesizing more samples, splitting data to training and validation list. After processing, data along with .lst files are stored to S3 for further use. 125 raw images were converted to 2500 processed samples.
When training with actual image files, AWS Image Classifier algorithm requires training and validation list with actual images. .lst is a list file having s.no, label and location of actual images. Please refer AWS documentation for built in algorithms i/p and o/p format which is algorithm specific. Pre-Processing should be done in Linux environment as .lst files are created with exact folder locations with “/” as folder separator and are to be used by SageMaker which is also Linux.
Step — 2. Once training and validation data is in S3, Model training is performed by triggering a training job using SageMaker Training. Parameters like algorithm ECR details, EC2 instance type, Hyperparameters , Training Channels and o/p S3 location is provided while triggering training job.
The details used here are SageMaker built in algorithm Image Classification, GPU instance ml.p2.xlarge, Hyperparameters like number of classes, image dimension, training instance count etc. As required by AWS, four channels like train, validation, train_lst and validation_lst is defined with exact S3 locations. O/P location is provided as an S3 bucket to store final model artifacts. It took around one hour for training job to train and store the model artifacts of size 6GB to S3 with training accuracy and validation accuracy being 97% and 94% respectively.
Step — 3: After artifacts is saved to S3, SageMaker Inference is used for online deployment. Deploying online model in AWS requires three steps like, Model Creation, Define End Point Configuration and End Point Creation. For Model Creation, details like exact location of ECR container used for training and S3 location of Trained Model Artifacts is provided. Once Model is Created, an End Point Configuration is defined which takes details like Created Model, Instance Type and Instance Count. Here ml.m4xlarge is used in configuration. In the last part, an End Point is Created using the Configuration details defined in previous step.
Step — 3.1: Once successfully created, the end point can be accessed directly from within AWS network using “aws sagemaker-runtime invoke-endpoint”. The o/p is a json file with probabilities of each class the object belongs to as below.
Step — 3.2: For accessing SageMaker end point from outside AWS, Lambda and API gateway are used. As part of Lambda, a python script is defined which will accept Base-64 encoded String, decode to Base-64 creating the Payload and invoke SageMaker end point for inference. Once result is inferred, the probabilities of each class will be compared and finally returning the actual Class name if Penguin or Not Penguin. The most important part here is defining lambda’s security roles and policies to access SageMaker. Next a Rest API is created with POST Method. While creating the method, it is integrated with previously created Lambda function. Once method is created, the API is finally deployed which can now be accessed from anywhere.
Step — 3.3: For accessing the API by loading an Image, a small web app was developed using Flask. The Web app will Upload image from user, Process the I/p image to right format ML model is seeking, Encode the I/p image to Base-64 string, Invoke Endpoint and Finally display the result by decoding the json response from AWS API.
Languages used is Python. All SageMaker setup was done using AWS console. Git Link for Machine Learning.
Kindly visit Git Hub for all the necessary scripts. Questions and Suggestions are welcome.
This marks the end of An Introduction to Big Data & ML Pipeline with AWS.
* AWS services used here will incur charges.
Thanks for reading. Questions and Suggestions are welcome.
Regards
Abhi.