Building recsys workflows: From idea to production in days, instead of weeks

Ramanathan R
SBX stories
Published in
14 min readSep 27, 2021
Photo by Chris Peeters from Pexels

TL;DR: Technology blog on how we deployed a reciprocal recommender system using AWS Step Functions, Data Science SDK and other AWS infrastructure/services. Discusses the problem and our solution design and architecture with code snippets. Codebase can be extended to productionize custom scripts for similar MLOps pipelines.

📋 Introduction

“Ideas are cheap. Execution is everything” — Chris Sacca

Well, there could exist a handful of rare ideas that are invaluable. But they don’t mean squat unless they are prototyped, implemented and tested. More importantly, implemented at speed which gives an unfair advantage over competition as the idea scales up.

This post shows how, within days, we went from a machine learning model in Jupyter Notebook to a production-grade system that serves billions of recommendations for one of the leading online dating companies in Japan with more than five million registered users.

📂 Background

With most of the common algorithms available off-the-shelf and optimized for distributed processing, building a recommender engine today using services like AWS Personalize is as simple as uploading and formatting the input data, selecting a training algorithm and deploying the solution.

However, productionizing a custom-built recommender algorithm has its fair share of challenges from developing a scalable algorithm to having robust MLOps baked-in. MLOps is buzz-word for packaging ML models into microservices in production, designing loosely coupled ML microservices and setting up hooks for these services to talk to each other via event-driven methods, their deployment and monitoring. We will focus on MLOps here.

👨‍💻 Architecture Overview

Basically, our recommender system processes the input data (implicit feedback via clicks and explicit user-profile data) collected from the upstream mobile app and for each user returns a set of potential users who are likely to match. The recommendations are generated by an ML model that gets retrained periodically. It consists of the following four core components:

  1. Preprocessing
  2. Training
  3. Batch Inference
  4. Live inference endpoint

Figure 1 shows the individual components of our architecture and the flow of control and data between them.

Figure 1: Solution Architecture using AWS services

However, when it comes to writing code to implement a distributed serverless recommender system, the more fine-grained the microservices become, the more the chances of the connections between them getting messy. If we are not careful, the system can easily snowball into a spaghetti monster of nanoservices and managing the flow of events through such an entangled web will be a nightmare. To avoid this overhead, it’s important we start with a simple architecture and increasingly introduce granularity as required.

Equally important is the choice of frameworks and libraries upon which we build our ecosystem of services and orchestrate them. Frameworks with clear-cut design principles that enable quick prototyping, rapid evolution of ideas, performant and fault-tolerant go a long way in shipping a robust product on time.

We built our architecture on top of AWS Step Functions Data Science SDK, a recently released Python based library that enables orchestration of AWS services at scale, without having to provision and integrate them individually. Here, the flow of control between the individual components is defined and governed by the Step Functions Data Science SDK.

Figure 2: An abstracted view of the same architecture via the lens of Step Functions Data Science SDK

From Figure 2, we can see how the Step Functions workflow tightly encapsulates our core ML services. We will now describe how to build the end-to-end recommender workflow using this framework.

🔧 Setting up the resources

Imports

First, let’s setup all the required modules. If not already done, we download and install the latest version of Step Functions, SageMaker, Boto 3 and other required libraries. We will be using the TrainingStep, ModelStep, ExecutionInput and Workflow modules of the Step Functions library.

import sys
!{sys.executable} -m pip install --upgrade stepfunctions
import uuid
import logging
import stepfunctions
import boto3
import sagemaker
import zipfile
from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker import s3_input
from sagemaker.s3 import S3Uploader
from stepfunctions import steps
from stepfunctions.steps import TrainingStep, ModelStep
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow
glue_client = boto3.client('glue')
lambda_client = boto3.client('lambda')

Dockerizing the custom algorithms

Next, we build and push the docker image for our preprocessing, training and inference routines that will be used by SageMaker, to AWS ECR. This image contains our customized algorithms and their scripts for each of the routines. Specifically, we will define how our training and inference algorithms will have to be invoked in the training_code/train and serving_code/serve scripts.

SageMaker will automatically run these scripts whenever a training job is triggered or a server hosting instance is spawned respectively. We will also be setting up the protocols for sending and receiving data between these routines and S3 via Lambda functions in the following sections.

We will not go into the details of our customized training algorithms for the purpose of this blog. Our framework is generic enough that any recommender system pipeline that has its own preprocessing, training and inference routines can be built on our framework.

%%shalgorithm_name=reciprocal-recommendercd containerchmod +x training_code/train
chmod +x serving_code/serve
account=$(aws sts get-caller-identity --query Account --output text)# Get the region defined in the current configuration (default to ap-northeast-1 if none defined)
region=$(aws configure get region)
region=${region:-ap-northeast-1}
fullname="${account}.dkr.ecr.${region}.amazonaws.com/${algorithm_name}:latest"# If the repository doesn't exist in ECR, create it.aws ecr describe-repositories --repository-names "${algorithm_name}" > /dev/null 2>&1if [ $? -ne 0 ]
then
aws ecr create-repository --repository-name "${algorithm_name}" > /dev/null
fi
# Get the login command from ECR and execute it directly
$(aws ecr get-login --region ${region} --no-include-email)
# Build the docker image locally with the image name and then push it to ECR
# with the full name.
docker build -t ${algorithm_name} -f Dockerfile.gpu .
docker tag ${algorithm_name} ${fullname}
docker push ${fullname}

Defining the inputs for the workflow

For each execution of a Step Function workflow, we parametrize the run by passing a set of input parameters. In addition to this, every step also sends its output to the next step. We will use both these parameters in the following steps to create dynamic workflows. It’s through these parameters the different microservices in our system will communicate with each other and they have to be fixed before we proceed to the individual steps in the workflow.

The following code defines the execution input for a single run of our workflow:

# Setup the workflow input schema
execution_input = ExecutionInput(schema={
'PreprocessingJobName': str,
'TrainingJobName': str,
'ProcessingLambdaFunctionName': str,
'BatchPredJobName': str,
'CreateBatchPredLambdaFunctionName': str,
'CreatePreprocessingLambdaFunctionName': str,
'GlueBatchJobName': str,
'ModelName': str,
'S3ModelPath': str,
'S3PreprocessedPath': str,
'S3RecommendationsPath': str,
'EndpointName': str,
'DoTraining': bool,
'DoPreprocessing': bool,
'DoBatchRecommend': bool,
'CreateNewEndpoint': bool
})

We come to the central part of the Step Functions Data Science SDK based architecture. We create the pipelines in the workflow in reverse starting from the end -inference, training and preprocessing.

Let’s start with the inference pipeline and create a Batch Inference step. The Batch Inference step generates the recommendations for all the users and saves the results in S3.

Creating the Batch Inference job

We start with the Lambda function that creates and triggers the Batch Inference job. The script create_batch_pred_job.py defines the behavior of the Lambda function, the S3 location that points to the processed input data, the instance type for the job run and the docker execution command to invoke the custom batch inference script.

# Initialize the name for the Lambda function
create_batch_pred_function_name = 'create-batch-pred-job'
# Compress the code for the Lambda function in a zip file
batch_pred_zip_name = 'create_batch_pred_job.zip'
batch_pred_source_code = 'container/training_code/create_batch_pred_job.py'
with zipfile.ZipFile(batch_pred_zip_name, 'w') as zf:
zf.write(batch_pred_source_code, arcname=batch_pred_source_code.split('/')[-1])
# Upload the zip file to S3
S3Uploader.upload(local_path=batch_pred_zip_name,
desired_s3_uri='s3://{}/{}'.format(bucket, project_name),
session=session)
# Create the Lambda function from the S3 location
response = lambda_client.create_function(
FunctionName=create_batch_pred_function_name,
Runtime='python3.7',
Role=lambda_role,
Handler='create_batch_pred_job.lambda_handler',
Code={
'S3Bucket': bucket,
'S3Key': '{}/{}'.format(project_name, batch_pred_zip_name)
},
Description='Triggers the inference job that generates the recommendations',
Timeout=15,
MemorySize=128
)

In the following code block, we create the Batch Inference step using the Lambda function that we just created and pass the model path and the output path parameters to it.

AWS Data Science SDK at the time of writing does not support ProcessingStep yet. Hence, we require a Lambda function to start the batch inference encoded as a SageMaker Processing job.

# Initialize the name for the Lambda function
create_batch_pred_function_name = 'create-batch-pred-job'
# Compress the code for the Lambda function in a zip file
batch_pred_zip_name = 'create_batch_pred_job.zip'
batch_pred_source_code = 'container/training_code/create_batch_pred_job.py'
with zipfile.ZipFile(batch_pred_zip_name, 'w') as zf:
zf.write(batch_pred_source_code, arcname=batch_pred_source_code.split('/')[-1])
# Upload the zip file to S3
S3Uploader.upload(local_path=batch_pred_zip_name,
desired_s3_uri='s3://{}/{}'.format(bucket, project_name),
session=session)
# Create the Lambda function from the S3 location
response = lambda_client.create_function(
FunctionName=create_batch_pred_function_name,
Runtime='python3.7',
Role=lambda_role,
Handler='create_batch_pred_job.lambda_handler',
Code={
'S3Bucket': bucket,
'S3Key': '{}/{}'.format(project_name, batch_pred_zip_name)
},
Description='Triggers the inference job that generates the recommendations',
Timeout=15,
MemorySize=128
)

Checking the status of the Batch Inference job

Next, we create the Lambda function that queries the SageMaker Processing job periodically and returns the status of the job whether it’s running or failed or succeeded.

# Initialize the name for the Lambda function
processing_function_name = 'query-processing-status'
# Compress the code for the Lambda function in a zip file
query_processing_zip_name = 'query_processing_status.zip'
query_processing_lambda_source_code = './container/training_code/query_processing_status.py'
with zipfile.ZipFile(query_processing_zip_name, mode='w') as zf:
zf.write(query_processing_lambda_source_code, arcname=query_processing_lambda_source_code.split('/')[-1])
# Upload the zip file to S3
S3Uploader.upload(local_path=query_processing_zip_name,
desired_s3_uri='s3://{}/{}'.format(bucket, project_name),
session=session)
# Create the Lambda function from the S3 location
response = lambda_client.create_function(
FunctionName=processing_function_name,
Runtime='python3.7',
Role=lambda_role,
Handler='query_processing_status.lambda_handler',
Code={
'S3Bucket': bucket,
'S3Key': '{}/{}'.format(project_name, query_processing_zip_name)
},
Description='Queries a SageMaker processing job and returns the results',
Timeout=15,
MemorySize=128
)

Create a Lambda step that encapsulates the Lambda function above into the workflow.

Internally, the Data Science SDK converts our Step Functions workflow into a state machine defined in Amazon States Language which is a JSON-based, structured language used to define our collection of states and the transitions between them. The notion of a state machine allows to abstract the individual microservices and the connections between them into a structured form that enables rapid development and testing.

By default, individual states in a Step Functions execution receive an input from the previous state in the form of JSON text and pass only the output of the current state as JSON output to the next state. But we want both the results of the current state (Batch Inference status) as well as execution input arguments from the first state available at a later state. This can be achieved by inserting the results of the current state in the ResultPath option. This also preserves the input and passes the combined JSON to the next state.

lambda_step_batch_pred = steps.compute.LambdaStep(
'Query Batch Inference Results',
parameters={
"FunctionName": execution_input['ProcessingLambdaFunctionName'],
"Payload":{
"ProcessingJobName.$": "$.BatchPredJobName"
}
},
result_path='$.BatchPredLambdaResult'
)

Next, we create a Wait step that blocks execution of the workflow for a certain period of time, a Succeed step that marks the completion of a successful run and a Fail step that will be reached in case of a workflow execution failure.

# Create a wait state for 60s before querying BatchPred every time
check_batch_pred_job_wait_state = steps.states.Wait(
"Wait-2: 60 secs",
seconds=60
)
# Create a workflow Success Step
success_step = steps.states.Succeed(
'Recommender Workflow Succeeded',
comment='Final state'
# Create BatchPred Failure Step
batch_pred_fail_step = steps.states.Fail(
"Batch Inference Failed",
comment = "Could not generate recommendations"
)
)

We create a choice step in order to build a dynamic workflow. This choice step branches based on the results of our Query Batch Inference Results step: did the Batch Inference job fail? or should the results be loaded into DynamoDB? Otherwise, should the workflow wait if the Batch Inference job is still running?

# Create a Batch Inference Choice State Stepcheck_job_choice_batch_pred = steps.states.Choice(
"Check Batch Inference Status"
)
batch_pred_failed = steps.choice_rule.ChoiceRule.StringEquals(variable=lambda_step_batch_pred.output()['BatchPredLambdaResult']['Payload']['ProcessingJobStatus'], value='Failed')
batch_pred_running = steps.choice_rule.ChoiceRule.StringEquals(variable=lambda_step_batch_pred.output()['BatchPredLambdaResult']['Payload']['ProcessingJobStatus'], value='InProgress')
batch_pred_finished = steps.choice_rule.ChoiceRule.StringEquals(variable=lambda_step_batch_pred.output()['BatchPredLambdaResult']['Payload']['ProcessingJobStatus'], value='Completed')
# Setup the dynamic workflow
check_job_choice_batch_pred.add_choice(
rule = batch_pred_running,
next_step=lambda_step_batch_pred
)
check_job_choice_batch_pred.add_choice(
rule = batch_pred_failed,
next_step = batch_pred_fail_step
)
check_job_choice_batch_pred.add_choice(
rule = batch_pred_finished,
next_step=load_recs_to_ddb_step,
)

Loading the recommendations into DynamoDB

Finally, we create a Glue step that runs an AWS Glue job developed using PySpark. The Glue job extracts the recommendations data from the parquet files in S3, processes the data in a suitable format that can be written to the DynamoDB and then writes the data into DynamoDB. Glue performs this extraction, transformation, and loading (ETL) in a serverless fashion. In order to manage the latency of the DynamoDB writes so that they don’t get throttled even at maximum provisioned throughput, we set WorkerType to ‘G.1X’ and NumberOfWorkers to 2.

# Initialize the name for the Glue job
glue_job_name = 'glue-batch-load-recs'
# Upload the Python script for the Glue job to S3
glue_script_location = S3Uploader.upload(local_path='./container/training_code/glue_batch_load_recs.py',
desired_s3_uri='s3://{}/{}'.format(bucket, project_name),
session=session)
# Create the Glue job from the S3 location
response = glue_client.create_job(
Name=glue_job_name,
Description='PySpark job to extract the parquet data from S3 and load it to DynamoDB',
Role=glue_role,
ExecutionProperty={
'MaxConcurrentRuns': 2
},
Command={
'Name': 'glueetl',
'ScriptLocation': glue_script_location,
'PythonVersion': '3'
},
DefaultArguments={
'--job-language': 'python'
},
GlueVersion='1.0',
WorkerType='G.1X',
NumberOfWorkers=2,
Timeout=100
)

Create a GlueStartJobRunStep step that encapsulates the Glue job above into the workflow. See the GlueStartJobRunStep Compute step in the AWS Step Functions Data Science SDK documentation.

# Create a batch recommendations generation step with AWS Glue
load_recs_to_ddb_step = steps.GlueStartJobRunStep(
'Load DDB Recommendations',
parameters={"JobName": execution_input['GlueBatchJobName'],
"Arguments":{
'--S3_SOURCE': execution_input['S3RecommendationsPath'],
'--DDB_DEST': 'recommendations'}
}
)
load_recs_to_ddb_step.next(success_step)

Linking the workflow steps together

Once we have defined our steps, we create our workflow definition by chaining all of the steps together that we’ve created. See Chain in the AWS Step Functions Data Science SDK documentation to learn more.

inference_workflow_definition = steps.Chain([ create_batch_pred_job_step, lambda_step_batch_pred, check_batch_pred_job_wait_state, check_job_choice_batch_pred ])

In the end-to-end workflow that we will eventually build, there could be times when we might not want to run the inference pipeline. To handle graceful execution of subroutines based on the execution input, we would like to create an entrypoint to the inference pipeline that will run only if the boolean parameter corresponding to the inference step is set to True.

inference_entry_checkpoint = steps.states.Choice(
"Do Inference?"
)
skip_batch_pred = steps.choice_rule.ChoiceRule.BooleanEquals(variable='$.DoBatchRecommend', value=False)
do_batch_pred = steps.choice_rule.ChoiceRule.BooleanEquals(variable='$.DoBatchRecommend', value=True)
inference_entry_checkpoint.add_choice(
rule=skip_batch_pred,
next_step=success_step
)
inference_entry_checkpoint.add_choice(
rule=do_batch_pred,
next_step=inference_workflow_definition
)

Now that we have setup our inference workflow, we can similarly setup the preprocessing and training workflows and chain all of them together into one end-to-end workflow.

Defining a training step

In the interest of time, we will not go through the preprocessing and training pipelines, but quickly describe how to create only the Training step of the training pipeline. Interested readers can refer to our Github repo for the full preprocessing and training pipelines.

We use the Estimator module of SageMaker which is a high-level interface for defining the training job. We can also pass the hyperparameters and metrics that we would like to track during training via the Esimator object.

recommender_estimator = sagemaker.estimator.Estimator(
image_name=training_container_uri,
role=sagemaker_execution_role,
train_instance_count=1,
train_instance_type='ml.p2.8xlarge',
hyperparameters={'vector_size': 50, 'epoch_count': 40, 'batch_value': 32768},
output_path='s3://{}/{}/data/model'.format(bucket, project_name),
metric_definitions=[{'Name': 'train:loss', 'Regex': '.*loss:\\s*(\\S+).*'}],
enable_sagemaker_metrics=True,
input_mode= 'File'
)

This is followed by creating the Training step and passing the estimator object we instantiated above. See TrainingStep in the AWS Step Functions Data Science SDK documentation to learn more.

training_step = steps.TrainingStep(
'Train Model',
data={
'training': s3_input(execution_input['S3PreprocessedPath']),
},
estimator=recommender_estimator,
job_name=execution_input['TrainingJobName'],
wait_for_completion=True,
result_path='$.TrainingJobResults'
)

📦 Creating the end-to-end workflow

When we have defined all our steps and the control and data flow between them, we can instantiate the end-to-end workflow.

e2e_workflow = Workflow(
name='End2End-Routine-{}'.format(id),
definition=steps.Chain([preprocessing_entry_checkpoint]),
role=workflow_execution_role,
execution_input=execution_input
)
e2e_workflow.render_graph()
e2e_workflow.create()

Figure 3 shows our Step Functions workflow with multiple executions in the past.

Figure 3: AWS Step Functions console showing the workflow

The JSON representation is our friend to understand the definition of the states, the JSON paths that define the flow of events in the state machine.

If not for the abstraction of a state machine, debugging a distributed system, its microservices and all the convoluted connections would be time-consuming.

A snapshot of the State Machine of our Step Functions workflow in its Amazon States Language representation would look like below.

{
"States": {
"Do Inference?": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.DoBatchRecommend",
"BooleanEquals": false,
"Next": "Recommender Workflow Succeeded"
},
{
"Variable": "$.DoBatchRecommend",
"BooleanEquals": true,
"Next": "Create Batch Inference Job"
}
]
},
"Create Batch Inference Job": {
"Parameters": {
"FunctionName.$": "$$.Execution.Input['CreateBatchPredLambdaFunctionName']",
"Payload": {
"Configuration": {
"JobName.$": "$$.Execution.Input['BatchPredJobName']",
"IAMRole": "arn:aws:iam::987654321:role/sagemakerexecutionrole",
"LocalStorageSizeGB": 50,
"S3InputDataPathModelData.$": "$$.Execution.Input['S3ModelPath']",
"S3OutputDataPath.$": "$$.Execution.Input['S3RecommendationsPath']",
"EcrContainerUri": "987654321.dkr.ecr.ap-northeast-1.amazonaws.com/reciprocal-recommender:latest"
}
}
},
"ResultPath": "$.CreateBatchPredLambdaResult",
"Resource": "arn:aws:states:::lambda:invoke",
"Type": "Task",
"Next": "Query Batch Inference Results"
},
"Query Batch Inference Results": {
"Parameters": {
"FunctionName.$": "$$.Execution.Input['ProcessingLambdaFunctionName']",
"Payload": {
"ProcessingJobName.$": "$.BatchPredJobName"
}
},
"ResultPath": "$.BatchPredLambdaResult",
"Resource": "arn:aws:states:::lambda:invoke",
"Type": "Task",
"Next": "Wait-2: 60 secs"
},
"Wait-2: 60 secs": {
"Seconds": 60,
"Type": "Wait",
"Next": "Check Batch Inference Status"
},
"Check Batch Inference Status": {
"Type": "Choice",
"Choices": [
{
"Variable": "$['BatchPredLambdaResult']['Payload']['ProcessingJobStatus']",
"StringEquals": "InProgress",
"Next": "Query Batch Inference Results"
},
{
"Variable": "$['BatchPredLambdaResult']['Payload']['ProcessingJobStatus']",
"StringEquals": "Failed",
"Next": "Batch Inference Failed"
},
{
"Variable": "$['BatchPredLambdaResult']['Payload']['ProcessingJobStatus']",
"StringEquals": "Completed",
"Next": "Load DDB Recommendations"
}
]
},
"Batch Inference Failed": {
"Comment": "Could not generate recommendations",
"Type": "Fail"
},
"Load DDB Recommendations": {
"Parameters": {
"JobName.$": "$$.Execution.Input['GlueBatchJobName']",
"Arguments": {
"--S3_SOURCE.$": "$$.Execution.Input['S3RecommendationsPath']",
"--DDB_DEST": "recommendations"
}
},
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Type": "Task",
"Next": "Recommender Workflow Succeeded"
},
"Train Model": {
"ResultPath": "$.TrainingJobResults",
"Resource": "arn:aws:states:::sagemaker:createTrainingJob.sync",
"Parameters": {
"AlgorithmSpecification": {
"TrainingImage": "987654321.dkr.ecr.ap-northeast-1.amazonaws.com/reciprocal-recommender:latest",
"TrainingInputMode": "File"
},
"OutputDataConfig": {
"S3OutputPath": "s3://sagemaker-ap-northeast-1-987654321/ml_deploy/data/model"
},
"StoppingCondition": {
"MaxRuntimeInSeconds": 86400
},
"ResourceConfig": {
"InstanceCount": 1,
"InstanceType": "ml.p2.8xlarge",
"VolumeSizeInGB": 30
},
"RoleArn": "arn:aws:iam::987654321:role/service-role/AmazonSageMaker-ExecutionRole-20200220T987654",
"InputDataConfig": [
{
"DataSource": {
"S3DataSource": {
"S3DataType": "S3Prefix",
"S3Uri.$": "$$.Execution.Input['S3PreprocessedPath']",
"S3DataDistributionType": "FullyReplicated"
}
},
"ChannelName": "training"
}
],
"HyperParameters": {
"vector_size": "50",
"epoch_count": "40",
"batch_value": "32768"
},
"TrainingJobName.$": "$$.Execution.Input['TrainingJobName']"
},
"Type": "Task",
"Next": "Save Model"
}
}{
"States": {
"Do Inference?": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.DoBatchRecommend",
"BooleanEquals": false,
"Next": "Recommender Workflow Succeeded"
},
{
"Variable": "$.DoBatchRecommend",
"BooleanEquals": true,
"Next": "Create Batch Inference Job"
}
]
},
"Create Batch Inference Job": {
"Parameters": {
"FunctionName.$": "$$.Execution.Input['CreateBatchPredLambdaFunctionName']",
"Payload": {
"Configuration": {
"JobName.$": "$$.Execution.Input['BatchPredJobName']",
"IAMRole": "arn:aws:iam::987654321:role/sagemakerexecutionrole",
"LocalStorageSizeGB": 50,
"S3InputDataPathModelData.$": "$$.Execution.Input['S3ModelPath']",
"S3OutputDataPath.$": "$$.Execution.Input['S3RecommendationsPath']",
"EcrContainerUri": "987654321.dkr.ecr.ap-northeast-1.amazonaws.com/reciprocal-recommender:latest"
}
}
},
"ResultPath": "$.CreateBatchPredLambdaResult",
"Resource": "arn:aws:states:::lambda:invoke",
"Type": "Task",
"Next": "Query Batch Inference Results"
},
"Query Batch Inference Results": {
"Parameters": {
"FunctionName.$": "$$.Execution.Input['ProcessingLambdaFunctionName']",
"Payload": {
"ProcessingJobName.$": "$.BatchPredJobName"
}
},
"ResultPath": "$.BatchPredLambdaResult",
"Resource": "arn:aws:states:::lambda:invoke",
"Type": "Task",
"Next": "Wait-2: 60 secs"
},
"Wait-2: 60 secs": {
"Seconds": 60,
"Type": "Wait",
"Next": "Check Batch Inference Status"
},
"Check Batch Inference Status": {
"Type": "Choice",
"Choices": [
{
"Variable": "$['BatchPredLambdaResult']['Payload']['ProcessingJobStatus']",
"StringEquals": "InProgress",
"Next": "Query Batch Inference Results"
},
{
"Variable": "$['BatchPredLambdaResult']['Payload']['ProcessingJobStatus']",
"StringEquals": "Failed",
"Next": "Batch Inference Failed"
},
{
"Variable": "$['BatchPredLambdaResult']['Payload']['ProcessingJobStatus']",
"StringEquals": "Completed",
"Next": "Load DDB Recommendations"
}
]
},
"Batch Inference Failed": {
"Comment": "Could not generate recommendations",
"Type": "Fail"
},
"Load DDB Recommendations": {
"Parameters": {
"JobName.$": "$$.Execution.Input['GlueBatchJobName']",
"Arguments": {
"--S3_SOURCE.$": "$$.Execution.Input['S3RecommendationsPath']",
"--DDB_DEST": "recommendations"
}
},
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Type": "Task",
"Next": "Recommender Workflow Succeeded"
},
"Train Model": {
"ResultPath": "$.TrainingJobResults",
"Resource": "arn:aws:states:::sagemaker:createTrainingJob.sync",
"Parameters": {
"AlgorithmSpecification": {
"TrainingImage": "987654321.dkr.ecr.ap-northeast-1.amazonaws.com/reciprocal-recommender:latest",
"TrainingInputMode": "File"
},
"OutputDataConfig": {
"S3OutputPath": "s3://sagemaker-ap-northeast-1-987654321/ml_deploy/data/model"
},
"StoppingCondition": {
"MaxRuntimeInSeconds": 86400
},
"ResourceConfig": {
"InstanceCount": 1,
"InstanceType": "ml.p2.8xlarge",
"VolumeSizeInGB": 30
},
"RoleArn": "arn:aws:iam::987654321:role/service-role/AmazonSageMaker-ExecutionRole-20200220T987654",
"InputDataConfig": [
{
"DataSource": {
"S3DataSource": {
"S3DataType": "S3Prefix",
"S3Uri.$": "$$.Execution.Input['S3PreprocessedPath']",
"S3DataDistributionType": "FullyReplicated"
}
},
"ChannelName": "training"
}
],
"HyperParameters": {
"vector_size": "50",
"epoch_count": "40",
"batch_value": "32768"
},
"TrainingJobName.$": "$$.Execution.Input['TrainingJobName']"
},
"Type": "Task",
"Next": "Save Model"
}
}

The entire state machine definition corresponding to the end-to-end workflow, both as a visual representation and as Amazon States Language code can be viewed in the Step Functions console as shown in Figure 4.

Figure 4: A running execution of the workflow

🏁 Executing the end-to-end workflow

Every time we want to run the end-to-end retraining, inference and deployment workflow, we trigger an execution based on the definition of the input parameters we created earlier.

In our setup, this is usually done by a Lambda function that monitors an S3 location for input files. Whenever new set of input files are dropped in, the Lambda function calls the execute method of our workflow.

execution = e2e_workflow.execute(
inputs={
'ProcessingLambdaFunctionName': processing_function_name,
'CreatePreprocessingLambdaFunctionName': create_preprocessing_function_name,
'CreateBatchPredLambdaFunctionName': create_batch_pred_function_name,
'PreprocessingJobName': 'user-transform-etl-{}'.format(id),
'TrainingJobName': 'train-{}'.format(id),
'ModelName': 'recommender-model-{}'.format(id),
'EndpointName': endpoint_name,
'BatchPredJobName': 'recommender-batch-transform-{}'.format(id),
'GlueBatchJobName': glue_job_name,
'S3ModelPath': 's3://{}/{}/data/model/train-{}/output'.format(bucket, project_name, id),
'S3PreprocessedPath': 's3://{}/{}/data/train/preprocessed-{}'.format(bucket, project_name, id),
'S3RecommendationsPath': 's3://{}/{}/data/output/recommendations-{}'.format(bucket, project_name, id),
'DoPreprocessing':True,
'DoTraining':True,
'DoBatchRecommend':True,
'CreateNewEndpoint': True
}
)

Figure 5 shows a completed run of the end-to-end pipeline. From the metrics logged for every step of the workflow, we can analyze the time taken by the individual microservices, performance bottlenecks and quickly iterate on building a robust pipeline.

Figure 5: AWS Step Functions workflow

--

--