MLOps — Building End to End Pipeline using Sagemaker SDK & AWS CodeCommit

Md Sharique
Analytics Vidhya
Published in
7 min readJan 23, 2022

In the post covid era, companies are constantly moving towards automating their business processes through the use of AI. Data scientists and data engineers are constantly in demand for supporting this constant change in the business requirement. Often people refer to this as the AI boom or the modern digital era.

With this change, data scientists are constantly looking for ways to quickly develop solutions and deploy them in a device-agnostic platform that can cater to the need of the population along with optimizing the cost of operating it. Once, the solution is deployed in a production-ready environment, the data scientist needs to collect the feedback of the model performance and continuously refine the model for achieving higher accuracy. This is leading the companies to migrate their solution from on-prem servers to the cloud environment where they have the flexibility to change their computing resources based on the model requirement and pay as per the usage helping the companies to reduce their operation cost as well the client to get the model delivered quickly.

In this article, we will be covering one such cloud environment (Amazon Web Service) solution which can help to automate the complete process of model training, deployment, and monitoring in a single workflow with just a click using AWS Sagemaker & AWS CodeCommit.

Key AWS Resources used:

AWS Sagemaker, AWS CodeCommit, Amazon S3, AWS CodeBuild and Amazon EventBridge

Dataset Used

In this sample approach, we will be using a commonly available dataset called “The Boston Housing Dataset” which represents the housing data in Boston MA with 14 different features and 506 different entries. Some of the common features include access to highways, average no. of room and nearness to rivers, etc. We will be trying to solve a regression task i.e., to predict the price of the houses based on 13 different features present in the dataset using a neural network-based algorithm.

Sagemaker Pipeline

So, now that we are clear about the purpose and the approach to the problem, we will first set up the sagemaker studio in AWS. Please refer to the link below for step by step guide:

Once, we are logged in to the sagemaker studio jupyter lab, we will create a project using the existing sagemaker project template. Please select the “MLOps template for model building, training, and deployment” template as shown in the below image

AWS Sagemaker Project creation

Now since the project is successfully set up, we will dive deep into coding aspects. But before getting started with the codes, we will first update sagemaker SDK to the latest version:

!pip install sagemaker --upgrade

Now we will initialize the sagemaker session, region, and s3 bucket name where we want to store data, experiment logs, and model artifacts:

import boto3
import os
import sagemaker
import tensorflow as tf

sess = sagemaker.session.Session()
bucket = sess.default_bucket()
region = boto3.Session().region_name

Define Parameter

Once, we have uploaded the data in the defined s3 bucket, we will make some edits to the pipeline.py file which executes the source code for model training and registry. In this, we need to first parametrize some key parameters which will enable us to execute custom pipelines and schedules without having to modify the pipeline definition. Below are the codes:

from sagemaker.workflow.parameters import ParameterInteger, ParameterString

# package versions
sklearn_version = ParameterString(name="SKLearnVersion", default_value="0.23-1")
tensorflow_version = ParameterString(name="TensorFlowVersion", default_value="2.3.1")
python_version = ParameterString(name="PythonVersion", default_value="py37")

# raw input data
input_data = ParameterString(name="InputData", default_value=s3_path)

# processing step parameters
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.xlarge")
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=3)

# training step parameters
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.c5.2xlarge")
training_instance_count = ParameterInteger(name="TrainingInstanceCount", default_value=1)

# batch inference step parameters
batch_instance_type = ParameterString(name="BatchInstanceType", default_value="ml.c5.xlarge")
batch_instance_count = ParameterInteger(name="BatchInstanceCount", default_value=1)

Define Processing Steps

The first step in the pipeline is to preprocess the input data. For this, we will instantiate a SKLearnProcessor object which allows us to specify instance type and count for a job using the parameters we defined above as well as specify tags that can be assigned for sagemaker training jobs and sagemaker endpoints as well for cost allocation and other purposes.

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
processing_tags = [{'Key': 'pipeline-demo', 'Value': 'TFDemoProcessing'}]sklearn_processor = SKLearnProcessor(
framework_version=sklearn_version.default_value,
instance_type=processing_instance_type,
instance_count=processing_instance_count,
base_job_name="tensorflow-demo-process",
sagemaker_session=sess,
role=sagemaker.get_execution_role(),
tags=processing_tags)

step_process = ProcessingStep(
name="TFDemo",
processor=sklearn_processor,
inputs=[
ProcessingInput(source=input_data, destination="/opt/ml/processing/input", s3_data_distribution_type='ShardedByS3Key'),
],
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
],
code="./scripts/preprocessing.py" )

With the ShardedByS3Key distribution type, data can be spread equally among n instances defined above to speed up the data transformation process by a factor of n.

Define Training, Model Creation, and Model Registry Steps

The below code will set up the pipeline step for a training job using the prebuilt TensorFlow docker container with a specified TensorFlow version.

from sagemaker.tensorflow import TensorFlow
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.step_collections import RegisterModel

image_uri_train = sagemaker.image_uris.retrieve(framework="tensorflow", region=region, version=tensorflow_version.default_value, py_version=python_version.default_value, instance_type=training_instance_type, image_scope="training")

For more such prebuilt sagemaker images, please refer to the below link:

Next, we need to specify an Estimator object and define a HyperparameterTuner object based on the default Bayesian Optimization tuning strategy to run hyperparameter tuning on the parameters defined below.

import time

model_path = f"s3://{bucket}/TFDemoTrain"
training_parameters = {'epochs': 20, 'batch_size': 64, 'learning_rate': 0.001, 'for_pipeline': 'true'}
training_metrics = [
{
"Name": "training:loss",
"Regex": ".*step - loss: ([0-9\\.]+) - val_loss: [0-9\\.]+ - batch: [0-9\\.]+.*",
},
{
"Name": "validation:loss",
"Regex": ".*step - loss: [0-9\\.]+ - val_loss: ([0-9\\.]+) - batch: [0-9\\.]+.*",
}
]
training_tags = [{'Key': 'pipeline-demo', 'Value': 'TFDemoTrain'}]estimator = TensorFlow(
image_uri=image_uri_train,
metric_definitions=training_metrics,
tags=training_tags,
source_dir='./scripts/',
entry_point='train.py',
instance_type=training_instance_type,
instance_count=training_instance_count,
role=sagemaker.get_execution_role(),
base_job_name="tensorflow-demo-train",
output_path=model_path,
hyperparameters=training_parameters )

Hyperparameter tuner definition:

from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner

hyperparameter_ranges = {
'learning_rate': ContinuousParameter(0.001, 0.2, scaling_type="Logarithmic"),
'epochs': IntegerParameter(5, 30),
'batch_size': IntegerParameter(64, 256),
}

metric_definitions = [{'Name': 'loss',
'Regex': 'loss: ([0-9\\.]+)'},
{'Name': 'val_loss',
'Regex': 'val_loss: ([0-9\\.]+)'}]

tuning_tags = [{'Key': 'pipeline-demo', 'Value': 'TFDemoTuning'}]
tuner = HyperparameterTuner(estimator,
'val_loss',
hyperparameter_ranges,
metric_definitions,
max_jobs=30,
max_parallel_jobs=10,
objective_type='Minimize',
tags=tuning_tags)

Next, we need to define a Training Step to insert the training job in the pipeline with inputs from the previous Processing step.

step_train = TrainingStep(
name="TFDemoTrain",
estimator=estimator,
inputs={
"train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"train"
].S3Output.S3Uri
),
"test": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"test"
].S3Output.S3Uri
)
},
)

Once, training steps are defined, we will also create a sagemaker Model object to wrap the model artifact, and associate it with a separate SageMaker prebuilt TensorFlow Serving inference container for inference purpose

from sagemaker.model import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep

image_uri_inference = sagemaker.image_uris.retrieve(framework="tensorflow", region=region, version=tensorflow_version.default_value, py_version=python_version.default_value, instance_type=batch_instance_type, image_scope="inference")
model = Model(image_uri=image_uri_inference, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=sess, role=sagemaker.get_execution_role())

inputs_model = CreateModelInput(instance_type=batch_instance_type)

step_create_model = CreateModelStep(name="TFDemoCreateModel", model=model, inputs=inputs_model)

The final step is to register a model through Sagemaker Model Registry for version control and improved model governance. With the model registry, we can manage different model versions and it can also be used as a part of CI/CD workflow for model deployment to the Sagemaker endpoint

from sagemaker.workflow.step_collections import RegisterModel

step_register = RegisterModel(name="TFDemoRegisterModel", estimator=estimator, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["text/csv"],
response_types=["text/csv"],
inference_instances=["ml.m5.xlarge"],
transform_instances=["ml.c5.xlarge"],
model_package_group_name="TFDemoModelPackageGroup",
image_uri=image_uri_inference)

Define Pipeline

In addition to all these steps, we can also create a similar step for model inference purposes. Finally, once all the steps are defined, we can stitch these steps in the Pipeline function to enable it to run in an automated fashion. This pipeline is also integrated with SageMaker Experiments, which lets us organize, track, compare, and evaluate ML experiments.

from sagemaker.workflow.pipeline import Pipeline, PipelineExperimentConfig
from sagemaker.workflow.execution_variables import ExecutionVariables

pipeline = Pipeline(
name=f"TFDemo",
parameters=[sklearn_version,
tensorflow_version,
python_version,
input_data,
processing_instance_type,
processing_instance_count,
training_instance_type,
training_instance_count,
batch_instance_type,
batch_instance_count],
steps=[step_process,
step_train,
step_create_model,
step_register,
step_batch
],
pipeline_experiment_config=PipelineExperimentConfig("TFDemoExperiment", ExecutionVariables.PIPELINE_EXECUTION_ID),
sagemaker_session=sess)

Automated Workflow Execution

Now, since the code is ready to be executed, we can either execute it manually through the code:

pipeline_tags = [{'Key': 'pipeline-demo', 'Value': 'TFDemoPipeline'}]
pipeline.upsert(role_arn=role, tags=pipeline_tags)
execution = pipeline.start()
execution.wait()
execution.list_steps()

Or, we can simply git push the edited code in the AWS CodeCommit and it will automatically trigger the pipeline solution to build, train and deploy the model through a manual approval process once the model is trained and get registered in the Sagemaker Registry.

Pipeline Execution through AWS Sagemaker Studio resource pipelines section

Conclusion

AWS Sagemaker studio is a great place for automating the complete end-to-end process of the model development and deployment through the interactive UI which AWS provides in the Sagemaker resources. In the pipeline section, all the execution details are shown. New execution can be started through a few parameter changes and clicks and the rest can be handled automatically by AWS. If we go deep in a single execution, we can visualize the complete pipeline flow along with input, output, and logs as shown in the image below.

Once, the model is being trained and registered, we can approve the model based on its validation score. After we approve the registered model, it will move to the deployment staging stage. In this stage, if we are satisfied with the testings metrics(such as blue-green testing or A/B testing), we can further approve the model for production-ready deployment.

Model Deployment Step through Manual Approval

Stay tuned to learn more about different approaches we use in day-to-day work life. Follow me on Linkedin to interact and share ideas: https://www.linkedin.com/in/mdsharique0107/

--

--