Simple MLOps #3: Inference Pipeline

Tales Marra
7 min readOct 29, 2023

--

In this third article of the Simple MLOps series, we’ll cover the step I imagine most of you are interested in: the inference pipeline. But don’t fool yourself, having the other steps properly done (feature/continuous training pipeline and the model registry) is crucial to ensuring the inference process will go smoothly.

Following our philosophy on the series, we’ll strive for simplicity, while building a system that can be put to production.

Without further due, let’s go through the architecture.

Understanding the inference pipeline components

Our architecture will be again serverless based, and will consist of a Lambda function coupled with an API gateway, so we’ll get an API-based prediction service.

The architecture of the prediction API

The inference function must be able to retrieve the latest model version from the model registry versioning table, and pull the latest model object from the storage. Then it will read the payload it has received through the API and perform the prediction on it. The result will be then forwarded via the API to the requester.

Creating an ECR repository to store Lambda image

Like we did for the training pipeline, the first thing we need to do is to create the ECR repository to store the image for our inference function. It’s recommended that you use a different repository than the one for the training function.

aws ecr create-repository \
--repository-name inference-image-repo \
--image-scanning-configuration scanOnPush=true \
--region region

Setting up .env file

To ensure the security of your credentials, we’ll use .env to load environment variables. Adding to the already set training environment variables the ones related to the inference, so the end file will look like this:

AWS_REGION=(YOUR AWS REGION)
AWS_CT_ECR_REPO=(YOUR ECR REPO)
CT_FUNCTION_NAME=ct-function
AWS_INF_ECR_REPO=(YOUR NEW CREATED REPOSITORY FOR INFERENCE)
INF_FUNCTION_NAME=inference-function

The inference function

We’ll now dive into the code of the inference function itself. As we discussed earlier, we’ll mostly need to implement three things.

  • Retrieving the latest model tag from the versioning table: We’ll use boto to do it, performing a scan on the table and then ordering the items. Notice that this is not the most efficient way to do it in a large table, but for the sake of simplicity we’ll do it like this. The code that allows us to do it is here:
table_name = 'simple-registry'
# use dynamodb as the service
dynamodb = boto3.resource('dynamodb')
# get a table object
table = dynamodb.Table(table_name)
# perform a scan on it. this retrieves all the rows from the table
response = table.scan()

if 'Items' in response and len(response['Items']) > 0:
# sort by id in reverse
tag_value = sorted(response['Items'], key=lambda x: x['id'], reverse=True)[0]['tag']
print("Latest_tag_value: ", tag_value)
else:
return {
'statusCode': 404,
'body': json.dumps('No models found')
}
  • Getting the latest model object from S3: Once we are in possession of the latest model tag, we can simply pull it from S3. The code to do it is the following:
s3 = boto3.client('s3')
model = s3.get_object(Bucket='registry-bucket-simple-ct', Key=f'model_{tag_value}.pkl')
model = pickle.loads(model['Body'].read())
  • Finally we call the predict method on the data. Notice the importance of using pipelines here. As we saved all the pre-processing transformations we need to apply to the data directly on the model, we don’t need to worry about anything, the model will take care of the pre-processing as well.
payload = json.loads(event['body'])
preds = model.predict(pd.DataFrame([payload]))[0]
return {
'statusCode': 200,
'statusCode': 200,
'body': json.dumps(str(preds))
}

Packaging code and dependencies using Docker

Docker is used to package our code and the required dependencies. We’ll build a docker image that will install our requirements and package the code. Similarly to what we did on the training pipeline, we’ll build, tag and push the docker image to the inference image repository.

FROM public.ecr.aws/lambda/python:3.8
# Install the function's dependencies using file requirements.txt
# from your project folder.
COPY requirements.txt .
RUN pip3 install -r requirements.txt --target "${LAMBDA_TASK_ROOT}"
# Copy function code to /var/task
COPY lambda_handler.py ${LAMBDA_TASK_ROOT}
# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile)
CMD [ "lambda_handler.lambda_handler" ]

Placing the Docker image in the cloud

If you’ve followed my advice and installed Just, you can go ahead and just execute the following:

just build-inf-image
just tag-inf-image
just push-ct-image

Building the Infrastructure

We’ll now use Terraform to build the necessary infrastructure.

Building the function

Similarly to what we did on the training pipeline, we’ll declare a lambda function resource, along with a role to which we’ll add permissions later on.

# Declare the ECR repository you've created previously
data "aws_ecr_repository" "inference_image_repo" {
name = "inference-image-repo"
}

# create a new role for the lambda function
resource "aws_iam_role" "simple_inference_role" {
name = "simple-inference-role"

assume_role_policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Action = "sts:AssumeRole",
Effect = "Allow",
Principal = {
Service = "lambda.amazonaws.com"
}
}
]
})
}

# declare a lambda function resource
resource "aws_lambda_function" "simple-inference" {
function_name = "inference-function"
role = aws_iam_role.simple_inference_role.arn
# notice we use the image from the repository for the lambda function
image_uri = "${data.aws_ecr_repository.inference_image_repo.repository_url}:latest"
package_type = "Image"
timeout = 900
memory_size = 128
depends_on = [
aws_iam_role_policy_attachment.cloudwatch_logs_attachment,
aws_cloudwatch_log_group.simple_inference_log_group,
]
}

Defining the interactions

The inference function will need read permissions both for S3 and DynamoDB in order to get model objects and versions. The way to give those permissions in AWS is to create a policy, and then attach the policy to the role of the resource, in our case, the role of the inference function we have just declared.


# create a policy to read from the dynamodb table
resource "aws_iam_policy" "dynamodb_access_policy_inference" {
name = "dynamodb-access-policy-inference"
description = "IAM policy for DynamoDB access"

policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Action = [
"dynamodb:GetItem",
"dynamodb:Scan",
"dynamodb:Query"
],
Effect = "Allow",
Resource = data.aws_dynamodb_table.simple_registry.arn,
},
],
})
}

# attach the policy to the role
resource "aws_iam_role_policy_attachment" "dynamodb_access_attachment" {
policy_arn = aws_iam_policy.dynamodb_access_policy_inference.arn
role = aws_iam_role.simple_inference_role.name
}

# create a policy to read from the s3 bucket
resource "aws_iam_policy" "s3_access_policy_inf" {
name = "s3-access-policy-inf"
description = "IAM policy for S3 access for inference"

policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Action = ["s3:GetObject"],
Effect = "Allow",
Resource = [
"${data.aws_s3_bucket.model_registry_bucket.arn}/*",
],
},
],
})
}

# attach the policy to the role
resource "aws_iam_role_policy_attachment" "s3_access_attachment" {
policy_arn = aws_iam_policy.s3_access_policy_inf.arn
role = aws_iam_role.simple_inference_role.name
}

Creating the API

Now we need to set up the API and the methods which will be used to call our function. The API needs therefore the permission to call Lambda on our behalf. The following code does exactly that.

# create an HTTP API gateway for the lambda function
resource "aws_apigatewayv2_api" "simple_inference_api" {
name = "simple-inference-api"
protocol_type = "HTTP"
}

resource "aws_lambda_permission" "apigw_lambda_permission" {
statement_id = "AllowAPIGatewayInvoke"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.simple-inference.function_name
principal = "apigateway.amazonaws.com"
source_arn = "${aws_apigatewayv2_api.simple_inference_api.execution_arn}/*/*"
}

resource "aws_apigatewayv2_integration" "simple_inference_integration" {
api_id = aws_apigatewayv2_api.simple_inference_api.id
integration_type = "AWS_PROXY"
integration_uri = aws_lambda_function.simple-inference.invoke_arn
integration_method = "POST"
}

resource "aws_apigatewayv2_stage" "simple_inference_stage" {
api_id = aws_apigatewayv2_api.simple_inference_api.id
name = "simple-inference-stage"
auto_deploy = true
}

We’ll now add a new route to the API that will be attached to the function. Notice that we add it on POST, so that we can send a payload along with it.

resource "aws_apigatewayv2_route" "simple_inference_route" {
api_id = aws_apigatewayv2_api.simple_inference_api.id
route_key = "POST /inference"
target = "integrations/${aws_apigatewayv2_integration.simple_inference_integration.id}"
}

Logging

We’ll now add Cloudwatch logging to our function, which can be used both for debugging and more advanced monitoring things, something we are going to see in the next article of the series.

# create a cloudwatch log group for the lambda function
resource "aws_cloudwatch_log_group" "simple_inference_log_group" {
name = "/aws/lambda/inference-function"
retention_in_days = 7
}


# attach the policy to the role for CloudWatch Logs
resource "aws_iam_policy" "cloudwatch_logs_policy" {
name = "cloudwatch-logs-policy"
description = "IAM policy for CloudWatch Logs access"

policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Action = ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"],
Effect = "Allow",
Resource = ["arn:aws:logs:*:*:*"]
},
],
})
}

# attach the policy to the role
resource "aws_iam_role_policy_attachment" "cloudwatch_logs_attachment" {
policy_arn = aws_iam_policy.cloudwatch_logs_policy.arn
role = aws_iam_role.simple_inference_role.name
}

Deploying the infrastructure

The final step consists in planning and applying the infrastructure changes. Just use the command below, and you’ll deploy everything we have set up to AWS!

terraform plan && terraform apply

Or, just deploy your infra with:

just deploy-inference

Testing the prediction service

Everything looks good! Now let’s test the prediction service we have created! You can go to AWS, take the endpoint of your API and called using cURL or Postman!

You can use this payload:

{
"age": 19,
"sex": "female",
"bmi": 27.9,
"children": 0,
"smoker": "yes",
"region": "southwest"
}

Conclusion

And BAM! You’ve got yourself a prediction pipeline, coupled with a continuous training pipeline and a model registry! This is already a great project to showcase on your portifolio. And we are going to make it even better!

The code for this tutorial is in my GitHub.

Hit that follow button to stay updated on future explorations! 🔥🤖 Let’s continue this journey of coding, machine learning, and MLOps.

--

--

Tales Marra

Machine Learning Engineer 🤖 | Writing about machine learning and MLOps in an accessible manner!