Introduction to Machine Learning in AWS
Fully automated end-to-end data science project using python programming and Amazon Web Services
Overview
This article will help you gain all the required skills to get started with a machine learning project in AWS. There are additional documentations and resources mentioned in this article to take your project to the next level.
It is very important to understand different elements of a data science project. We will go through each service area for this project step by step and understand their usage, importance, requirements and their contribution to the project.
Architecture
1. Interacting with Amazon Web Services
One of the major challenges working in a cloud environment is to understand how to interact with the service. You will definitely want to understand when to use a service and when to shut it down since AWS uses a Pay-Per-Use billing model to charge it’s customers.
To interact with AWS services we will use Boto3 AWS Software Development Kit (SDK) for python programming.
SDK— A software development kit (SDK) is a collection of software development tools in one installable package. SDK provide a set of tools, libraries, relevant documentation, code samples, processes, and or guides that allow developers to create software applications on a specific platform.
Think of this as a tool kit to put together a program or a code for your project.
For the sake of this project we will only discuss the following services.
Amazon S3: object storage service offering industry-leading scalability, data availability, security, and performance.
We will be using Amazon S3 for storing our raw data, as well as storing and retrieving the processed data for analysis and training machine learning models.
Amazon SageMaker: helps data scientists and developers to prepare, build, train, and deploy high-quality machine learning models quickly by bringing together a broad set of capabilities purpose-built for ML.
Amazon Lambda: Build and run applications without thinking about servers. Run code without provisioning or managing infrastructure.
Execute code at the capacity you need, as you need it. Scale to match your data volume automatically and enable custom event triggers.
Amazon CloudWatch: Amazon CloudWatch Events delivers a near real-time stream of system events that describe changes in Amazon Web Services (AWS) resources.
Using simple rules that you can quickly set up, you can match events and route them to one or more target functions or streams.
2. Setting up configurations
Before moving to the next step, if you are not familiar with setting up environment variables for AWS please refer to this documentation on how to configure AWS credentials for using SDK. (Link to the documentation)
We have split our code into two different scripts.
pre-processing.py
The first script “pre-processing” will convert the raw data from S3 source into the desired format and structure. And then write it back to an optimized location for easy access. This will reside inside the AWS Lambda function.
ml-model.ipynb
The second script “ml-model” will ingest the structure data and utilize open source machine learning packages such as scikit-learn, Keras, PyTorch, TensorFlow, or AWS native ML packages for SageMaker. This will reside inside AWS SageMaker instance.
Uploading Data (S3)
Method 1 — Use AWS console
Method 2— Use SDK support
#Uploading a sample text file to S3.import boto3
s3 = boto3.resource('s3')
s3.meta.client.upload_file('/tmp/hello.txt', 'mybucket', 'hello.txt')
Data Ingestion
Reading CSV file from S3 using Boto3
import pandas as pd
import boto3
bucket = "your_bucket"
file_name = "your_file.csv"
s3 = boto3.client('s3')
obj = s3.get_object(Bucket= bucket, Key= file_name)
df = pd.read_csv(obj['Body'])
Reading JSON file from S3 using Boto3
import pandas as pd
import boto3bucket = "your_bucket"
file_name = "your_file.json"s3 = boto3.client('s3') obj = s3.get_object(Bucket= bucket, Key= file_name) df = pd.read_json(obj['Body'])
Reading Parquet file from S3 using pyarrow
# pip install pyarrow
# pip install s3fs
import s3fs
import pyarrow.parquet as pq
fs = s3fs.S3FileSystem()
bucket = 'your_bucket'
path = 'directory_name/your_file.parquet'
bucket_uri = f's3://{bucket}/{path}'
dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
table = dataset.read()
df = table.to_pandas()
3. Working with Data
Once we have uploaded our data in the S3 location, we can start working on the pre-processing script. This script for pre-processing the raw data to structured data, can reside either in the lambda function or a SageMaker Notebook and will be triggered by Amazon CloudWatch rules every time there is new raw data uploaded.
For the sake of simplicity, we will be creating two S3 buckets to store raw and processed data separately. Since every dataset comes with its own unique challenges we can follow some general rules to cover most of the areas where data anomalies exist.
- Checking missing/duplicate data
- Incorrect data types
- Indexing
- Aggregation or Normalization
Here is an example that reads a raw file from S3 bucket, performs some transformations to the file and writes it back to another S3 bucket in a well structured format.
# Read file from raw bucket and write back structured data to processed bucket.import os
from io import StringIO
import pandas as pd
import boto3def lambda_handler(event, context):
bucket = "raw_bucket"
file_name = "your_file.csv"
s3 = boto3.client('s3')
obj = s3.get_object(Bucket= bucket, Key= file_name)
df = pd.read_csv(obj['Body']) #Pre-Processing Steps
bucket = "processed_bucket" csv_buffer = StringIO()
df.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket,file_name).put(Body=csv_buffer.getvalue())
We can take this same approach and utilize Amazon SageMaker functionalities to pre-process the raw data.
a) Create preprocessing script
from __future__ import print_functionimport time
import sys
from io import StringIO
import os
import shutilimport argparse
import csv
import json
import numpy as np
import pandas as pdfrom sklearn.compose import ColumnTransformer
from sklearn.externals import joblib
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import Binarizer, StandardScaler, OneHotEncoderfrom sagemaker_containers.beta.framework import (
content_types, encoders, env, modules, transformer, worker)# Since we get a headerless CSV file we specify the column names here.
feature_columns_names = [
'sex', # M, F, and I (infant)
'length', # Longest shell measurement
'diameter', # perpendicular to length
'height', # with meat in shell
'whole_weight', # whole abalone
'shucked_weight', # weight of meat
'viscera_weight', # gut weight (after bleeding)
'shell_weight'] # after being driedlabel_column = 'rings'feature_columns_dtype = {
'sex': str,
'length': np.float64,
'diameter': np.float64,
'height': np.float64,
'whole_weight': np.float64,
'shucked_weight': np.float64,
'viscera_weight': np.float64,
'shell_weight': np.float64}label_column_dtype = {'rings': np.float64} # +1.5 gives the age in yearsdef merge_two_dicts(x, y):
z = x.copy() # start with x's keys and values
z.update(y) # modifies z with y's keys and values & returns None
return zif __name__ == '__main__': parser = argparse.ArgumentParser() # Sagemaker specific arguments. Defaults are set in the environment variables.
parser.add_argument('--output-data-dir', type=str, default=os.environ['SM_OUTPUT_DATA_DIR'])
parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN']) args = parser.parse_args() # Take the set of files and read them all into a single pandas dataframe
input_files = [ os.path.join(args.train, file) for file in os.listdir(args.train) ]
if len(input_files) == 0:
raise ValueError(('There are no files in {}.\n' +
'This usually indicates that the channel ({}) was incorrectly specified,\n' +
'the data specification in S3 was incorrectly specified or the role specified\n' +
'does not have permission to access the data.').format(args.train, "train")) raw_data = [ pd.read_csv(
file,
header=None,
names=feature_columns_names + [label_column],
dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype)) for file in input_files ]
concat_data = pd.concat(raw_data) # We will train our classifier with the following features:
# Numeric Features:
# - length: Longest shell measurement
# - diameter: Diameter perpendicular to length
# - height: Height with meat in shell
# - whole_weight: Weight of whole abalone
# - shucked_weight: Weight of meat
# - viscera_weight: Gut weight (after bleeding)
# - shell_weight: Weight after being dried
# Categorical Features:
# - sex: categories encoded as strings {'M', 'F', 'I'} where 'I' is Infant
numeric_features = list(feature_columns_names)
numeric_features.remove('sex')
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())]) categorical_features = ['sex']
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('onehot', OneHotEncoder(handle_unknown='ignore'))]) preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)],
remainder="drop") preprocessor.fit(concat_data) joblib.dump(preprocessor, os.path.join(args.model_dir, "model.joblib"))
The next methods of the script are used during inference. The input_fn
and output_fn
methods will be used by Amazon SageMaker to parse the data payload and reformat the response. In this example, the input method only accepts ‘text/csv’ as the content-type, but can easily be modified to accept other input formats. The input_fn
function also checks the length of the csv passed to determine whether to preprocess training data, which includes the label, or prediction data. The output method returns back in JSON format because by default the Inference Pipeline expects JSON between the containers, but can be modified to add other output formats.
def input_fn(input_data, content_type):
"""Parse input data payload We currently only take csv input. Since we need to process both labelled
and unlabelled data we first determine whether the label column is present
by looking at how many columns were provided.
"""
if content_type == 'text/csv':
# Read the raw input data as CSV.
df = pd.read_csv(StringIO(input_data),
header=None) if len(df.columns) == len(feature_columns_names) + 1:
# This is a labelled example, includes the ring label
df.columns = feature_columns_names + [label_column]
elif len(df.columns) == len(feature_columns_names):
# This is an unlabelled example.
df.columns = feature_columns_names return df
else:
raise ValueError("{} not supported by script!".format(content_type))
def output_fn(prediction, accept):
"""Format prediction output The default accept/content-type between containers for serial inference is JSON.
We also want to set the ContentType or mimetype as the same value as accept so the next
container can read the response payload correctly.
"""
if accept == "application/json":
instances = []
for row in prediction.tolist():
instances.append({"features": row}) json_output = {"instances": instances} return worker.Response(json.dumps(json_output), accept, mimetype=accept)
elif accept == 'text/csv':
return worker.Response(encoders.encode(prediction, accept), accept, mimetype=accept)
else:
raise RuntimeException("{} accept type is not supported by this script.".format(accept))
Our predict_fn
will take the input data, which was parsed by our input_fn, and the deserialized model from the model_fn (described in detail next) to transform the source data. The script also adds back labels if the source data had labels, which would be the case for preprocessing training data.
def predict_fn(input_data, model):
"""Preprocess input data We implement this because the default predict_fn uses .predict(), but our model is a preprocessor
so we want to use .transform(). The output is returned in the following order: rest of features either one hot encoded or standardized
"""
features = model.transform(input_data) if label_column in input_data:
# Return the label (as the first column) and the set of features.
return np.insert(features, 0, input_data[label_column], axis=1)
else:
# Return only the set of features
return features
The model_fn
takes the location of a serialized model and returns the deserialized model back to Amazon SageMaker. Note that this is the only method that does not have a default because the definition of the method will be closely linked to the serialization method implemented in training. In this example, we use the joblib library included with Scikit-learn.
def model_fn(model_dir):
"""Deserialize fitted model
"""
preprocessor = joblib.load(os.path.join(model_dir, "model.joblib"))
return preprocessor
b) Fit the data preprocessor
We now create a preprocessor using the script we defined in step 4. This will allow us to send raw data to the model and output the processed data. To do this, we define an SKLearn estimator that accepts several constructor arguments:
- entry_point: The path to the Python script that Amazon SageMaker runs for training and prediction (this is the script we defined in step 4).
- role: Role Amazon Resource Name (ARN).
- train_instance_type (optional): The type of Amazon SageMaker instances for training. Note: Because Scikit-learn does not natively support GPU training, Amazon SageMaker Scikit-learn does not currently support training on GPU instance types.
- sagemaker_session (optional): The session used to train on Amazon SageMaker.
from sagemaker.sklearn.estimator import SKLearnscript_path = '/home/ec2-user/sample-notebooks/sagemaker-python-sdk/scikit_learn_inference_pipeline/sklearn_abalone_featurizer.py'sklearn_preprocessor = SKLearn(
entry_point=script_path,
role=role,
train_instance_type="ml.c4.xlarge",
sagemaker_session=sagemaker_session)sklearn_preprocessor.fit({'train': train_input})
It will take a few minutes (up to 5) for the preprocessor to be created. After the preprocessor is ready, we can send our raw data to the preprocessor and store our processed abalone data back in Amazon S3. We’ll do this in the next step.
c) Batch transform training data
Now that our preprocessor is ready, we can use it to batch transform raw data into preprocessed data for training. To do this, we create a transformer and point it to the raw data on Amazon S3:
# Define a SKLearn Transformer from the trained SKLearn Estimator
transformer = sklearn_preprocessor.transformer(
instance_count=1,
instance_type='ml.m4.xlarge',
assemble_with = 'Line',
accept = 'text/csv')# Preprocess training input
transformer.transform(train_input, content_type='text/csv')
print('Waiting for transform job: ' + transformer.latest_transform_job.job_name)
transformer.wait()
preprocessed_train = transformer.output_path
When the transformer is done, our transformed data will be stored in Amazon S3. You can find the location of the preprocessed data by looking at the values in the preprocessed_train variable.
4. Amazon SageMaker Setup
Now that all of our data is processed and well structured we will start developing the machine learning model in Amazon SageMaker. For this we will be using SageMaker Studio
One of the biggest advantages of using SageMaker is that it supports built-in open source frameworks as well as provides supports to deploy and manage the model in production.
Log on to the AWS Console and then navigate to AWS SageMaker.
For using the SageMaker Jupyter Notebook we need to create an instance. This lets AWS know what types of resources we will require for powering our notebook.
You can find more details on SageMaker instance configurations, setup and cost on Amazon’s documentation webpage.
Now we will create a notebook instance.
For a large scale job we will ideally want to spin up a larger instance which can support heavy memory and storage usage.
Once the instance has been created and is ready to be used, the Status will change from Pending to InService. Now we can navigate to the Jupyter Notebook by clicking on Open Jupyter or Open JupyerLab.
Now we can start working on our ml-model Jupyter notebook. For creating a new notebook we will have to specify which type of script we wish to work on, in this case we are going to use conda_python3 for running python 3 in a Jupyter Notebook.
5. Machine Learning Model
Different machine learning algorithms work best depending on the use cases. Some of them are mentioned below.
TYPES OF ML ALGORITHMS:
1. Hypothesis Testing
2. Linear Regression
3. Logistic Regression
4. Clustering
5. ANOVA (analysis of variance)
6. Principal Component Analysis
7. Conjoint Analysis
8. Neural Networks
9. Decision Trees
10. Ensemble Methods
An example of Linear Regression model.
import matplotlib.pyplot as plt
import numpy as np
from sklearn import linear_model
from sklearn.metrics import mean_squared_error, r2_score
import pandas as pd
import boto3
bucket = "your_bucket"
file_name = "your_file.csv"
s3 = boto3.client('s3')
obj = s3.get_object(Bucket= bucket, Key= file_name)
df = pd.read_csv(obj['Body'])# Load the datasetdf_X = df.loc[:, df.columns != 'target-column']
df_y = df['target-column']
# Split the data into training/testing sets
df_X_train = df_X[:-20]
df_X_test = df_X[-20:]
# Split the targets into training/testing sets
df_y_train = df_y[:-20]
df_y_test = df_y[-20:]
# Create linear regression object
regr = linear_model.LinearRegression()
# Train the model using the training sets
regr.fit(df_X_train, df_y_train)
# Make predictions using the testing set
df_y_pred = regr.predict(df_X_test)
# The coefficients
print("Coefficients: \n", regr.coef_)# The mean squared error
print("Mean squared error: %.2f" % mean_squared_error(df_y_test, df_y_pred))# The coefficient of determination: 1 is perfect prediction
print("Coefficient of determination: %.2f" % r2_score(df_y_test, df_y_pred))
# Plot outputs
plt.scatter(df_X_test, df_y_test, color="black")
plt.plot(df_X_test, df_y_pred, color="blue", linewidth=3)
plt.xticks(())
plt.yticks(())
plt.show()
You can learn more about different Supervised and Unsupervised machine learning models and methods below.
Model Selection and Evaluation
6. Cost/Billing
Now that all your services are up and running it is important to know how the billing costs/charges will change as your usage scales up. Here are some important resources for understanding the cost breakdown.
For an estimate on AWS resources cost you can check out the Amazon Pricing Calculator (Here)
Configure a cost estimate that fits your unique business or personal needs with AWS products and services.