Deploy and Scale Test PyTorch Setfit Model in Sagemaker

Nghodki
7 min readApr 22, 2024

--

We will talk and discuss about how to deploy a PyTorch setfit model on sage-maker and perform load-testing on the model and scale it. Once we have a trained model that needs to be deployed to sage-maker inference, below steps can be followed.

Deploy a PyTorch Setfit model on Sage-maker:

There are 2 ways we can deploy models to sage-maker, we will follow the one with sage-maker SDK.

For the script to deploy the model to sage-maker, we used sagemaker.pytorch.model

from sagemaker.pytorch.model import PyTorchModel

Lets create a client with this method, this client will need inputs as model_data which is usually S3 path for the trained model archived package, IAM_role which has permissions to create endpoint, model and endpoint configuration, PyTorch framework and python version used for training the model.

Note: For the package for model to be deployed, it archival should not be over the folder of package but from within the package folder. For example, if your package has files like

Create the archive from the pat where these files reside with command similar to

tar -cvzf model.tar.gz ./

Once we have the details, we can create a client as below (More details about the method can be learnt here)

pt_model = PyTorchModel(
# path to your trained SageMaker model
model_data=s3_uri, # S3 path where the model package is uploaded
role=role, # IAM role the sagemaker endpoint will use
framework_version="2.0", # PyTorch version used
py_version='py310', # Python version used
)

While creating inference for model, the model package should consist of inference script to invoke the model, this inference script can be as below:

import ast

from sagemaker_inference import encoder, decoder
from setfit import SetFitModel


def model_fn(model_dir):
model = SetFitModel.from_pretrained(model_dir)
model.to('cuda')
print(f"model loaded successfully {model}")
return model


def input_fn(input_data, content_type):
"""A default input_fn that can handle JSON, CSV and NPZ formats.

Args:
input_data: the request payload serialized in the content_type format
content_type: the request content_type

Returns: input_data deserialized into the expected format. Currently expected
format is {"inputs": ["q1", "q2", ...]}
"""
decoded = None
try:
print(f"input_data: {input_data}, content_type: {content_type}")
decoded = decoder.decode(input_data, content_type)
print(f"decoded input: {decoded}, content_type: {content_type}")
return ast.literal_eval(str(decoded))
except Exception as e:
print(f"invalid input. input: {decoded}, error: {e}")
raise e


def output_fn(prediction, accept):
"""A default output_fn for PyTorch. Serializes predictions from predict_fn to JSON, CSV or NPY format.

Args:
prediction: a prediction result from predict_fn
accept: type which the output data needs to be serialized

Returns: output data serialized
"""
print(f"prediction: {prediction}, prediction type: {type(prediction)}, accept: {accept}")
encoded = encoder.encode(prediction, accept)
print(f"encoded output: {encoded}, content_type: {accept}")
return encoded


def predict_fn(data, model):
"""A default predict_fn for PyTorch. Calls a model on data deserialized in input_fn.
Runs prediction on GPU if cuda is available.

Args:
data: input data for prediction deserialized by input_fn
model: PyTorch model loaded in memory by model_fn

Returns: a prediction
"""
try:
print(f"data: {data}, data_type: {type(data)}")
inputs = data.get("inputs", None)
if inputs is None:
raise Exception(f"\"inputs\" not found: {data}")
return model.predict(inputs)
except Exception as e:
print(f"predict_fn error: {e}")
raise e

This file should be part of folder named code inside the package. We have specified in this script to use cuda (GPU) for computing.

Before deploying the model to sage-maker endpoint, make sure this file is present inside a folder called code. We can use below function to do this:

# Download the model package
def download_model(s3_uri):
s3 = boto3.client('s3')
bucket = s3_uri.split('/')[2]
key = '/'.join(s3_uri.split('/')[3:])
model = key.split('/')[-1]
s3.download_file(bucket, key, model)
print("Model package downloaded successfully")
return model

#unzip the model package in folder model_to_deploy without printing the output
def silent_unzip(model):
os.system("mkdir model_to_deploy")
os.system("tar -xzf "+model+" -C model_to_deploy")
print("Model package unzipped successfully")

# delete the model package and model_to_deploy folder
def delete_files():
os.system("rm -rf " + model)
os.system("rm -rf model_to_deploy")

# check if inference.py and requirements.txt and code directory exists in above extracted directory or subfolders
def check_files(directory):
check = 0
for root, dirs, files in os.walk(directory):
if 'inference.py' in files and 'requirements.txt' in files:
check += 1
if 'code' in dirs:
check += 1
if check == 2:
return True
else:
return False

model = download_model(s3_uri)
silent_unzip(model)

#verify if code/ directory exists in above extracted directory as subfolder

if check_files("model_to_deploy"):
print("Code directory includes inference.py and requirements.txt")
delete_files()
else:
print("Code directory does not exist")
print("Exiting")
delete_files()
exit(1)

Lets use deploy method from client created initially to deploy the model

predictor = pt_model.deploy(
endpoint_name=endpoint_name,
initial_instance_count=3,
instance_type="ml.g4dn.xlarge"
)

We have used ml.g4dn.xlarge, as based on document AWS has tested PyTorch models on set of instances and ml.g4dn.xlarge is one of them.

Once the model is deploy lets verify if the model endpoint is in service and working as expected, by querying the endpoint

def query_endpoint(payload: dict, endpoint_name: str) -> dict:
client = boto3.client("sagemaker-runtime", region_name=os.environ["AWS_DEFAULT_REGION"])
response = client.invoke_endpoint(
EndpointName=endpoint_name,
ContentType="application/json",
Body=json.dumps(payload),
CustomAttributes="accept_eula=true",
)
response = response["Body"].read().decode("utf8")
response = json.loads(response)
return response

response= query_endpoint(<<Format for input as stated during training>>, endpoint_name)
print(response)

Putting this all together gets to

import os
import json
import boto3
import argparse
from sagemaker.pytorch.model import PyTorchModel

#Argument parser
parser = argparse.ArgumentParser(
prog='Deploy Model to Sagemaker',
description='This script deploys the model to sagemaker',
epilog='Usage: python3 deploymodel.py --model_path <path to model.tar.gz> --endpoint_name <name of endpoint> --region <aws region> --role <role name>')

parser.add_argument('--model_path', type=str, help='Path to model', required=True)
parser.add_argument('--endpoint_name', type=str, help='Name of endpoint', required=True)
parser.add_argument('--region', type=str, help='AWS region', default='us-west-2')
parser.add_argument('--role', type=str, help='Role name', required=True)
argparse = parser.parse_args()

# Set environment variables
os.environ["AWS_DEFAULT_REGION"] = argparse.region
endpoint_name=argparse.endpoint_name
s3_uri=argparse.model_path
iam_client = boto3.client('iam')
role = iam_client.get_role(RoleName=argparse.role)['Role']['Arn']

# Download the model package
def download_model(s3_uri):
s3 = boto3.client('s3')
bucket = s3_uri.split('/')[2]
key = '/'.join(s3_uri.split('/')[3:])
model = key.split('/')[-1]
s3.download_file(bucket, key, model)
print("Model package downloaded successfully")
return model

#unzip the model package in folder model_to_deploy without printing the output
def silent_unzip(model):
os.system("mkdir model_to_deploy")
os.system("tar -xzf "+model+" -C model_to_deploy")
print("Model package unzipped successfully")


# delete the model package and model_to_deploy folder
def delete_files():
os.system("rm -rf " + model)
os.system("rm -rf model_to_deploy")
# check if inference.py and requirements.txt and code directory exists in above extracted directory or subfolders
def check_files(directory):
check = 0
for root, dirs, files in os.walk(directory):
if 'inference.py' in files and 'requirements.txt' in files:
check += 1
if 'code' in dirs:
check += 1
if check == 2:
return True
else:
return False


model = download_model(s3_uri)
silent_unzip(model)

#verify if code/ directory exists in above extracted directory as subfolder

if check_files("model_to_deploy"):
print("Code directory includes inference.py and requirements.txt")
delete_files()
else:
print("Code directory does not exist")
print("Exiting")
delete_files()
exit(1)


# Deploy model to SageMaker
pt_model = PyTorchModel(
# path to your trained SageMaker model
model_data=s3_uri,
role=role, # IAM role the sagemaker endpoint will use
framework_version="2.0", # PyTorch version used (actual: 2.1.1)
py_version='py310', # Python version used
)
# deploy model to SageMaker Inference
predictor = pt_model.deploy(
endpoint_name=endpoint_name,
initial_instance_count=3,
instance_type="ml.g4dn.xlarge"
)

# test inference
def query_endpoint(payload: dict, endpoint_name: str) -> dict:
client = boto3.client("sagemaker-runtime", region_name=os.environ["AWS_DEFAULT_REGION"])
response = client.invoke_endpoint(
EndpointName=endpoint_name,
ContentType="application/json",
Body=json.dumps(payload),
CustomAttributes="accept_eula=true",
)
print(response)
response = response["Body"].read().decode("utf8")
response = json.loads(response)
return response

response= query_endpoint({"inputs": ["How many policies are there?"]}, endpoint_name)
print(response)

Hooray!!! You have deployed the model to sage-maker.

Load Testing on the endpoint deployed:

We have used locust to perform load testing for the model endpoint. The sage-maker endpoint acts like a load-balancer, so you can set number of instances for inference based on load you are expecting.

Create a small script to trigger query for the model endpoint

import os
import json
import time
from locust import User, task, between
import boto3


def query_endpoint(payload, endpoint_name, content_type="application/json", message_id=""):
client = boto3.client("sagemaker-runtime", region_name=os.environ["AWS_DEFAULT_REGION"])
response = client.invoke_endpoint(
EndpointName=endpoint_name,
ContentType=content_type,
Body=json.dumps(payload),
CustomAttributes="accept_eula=true",
)
response_body = response["Body"].read().decode("utf8")
return json.loads(response_body)


class SageMakerEndpointUser(User):
# wait_time = between(1, 2)
input_queries = ["query-1","query-2"................"query-200"]
@task
def query_task_classifier(self):
for input_query in self.input_queries:
payload = {"inputs": input_query}
endpoint_name = endpoint_name
self.common_helper(payload, endpoint_name, "application/json")

def common_helper(self, payload, endpoint_name, content_type):
start_time = time.time()
try:
result = query_endpoint(payload, endpoint_name, content_type)
# In real usage, you should determine the success condition properly
self.environment.events.request.fire(
request_type="SageMaker",
name=endpoint_name,
response_time=int((time.time() - start_time) * 1000),
response=result,
response_length=len(result),
)
except Exception as e:
# Fire a failure event
self.environment.events.request.fire(
request_type="SageMaker",
name=endpoint_name,
response_time=int((time.time() - start_time) * 1000),
exception=e,
)


# Set environment variables
os.environ["AWS_DEFAULT_REGION"] = "us-west-2"

So, we need to execute this script in parallel to condition it for 100 users running it simultaneously for about 1 min

locust -f locust_test.py --users 100 --spawn-rate 100 --html=m_log_1.html --processes -1

Output can be like below

Type     Name                                                                          # reqs      # fails |    Avg     Min     Max    Med |   req/s  failures/s
--------|----------------------------------------------------------------------------|-------|-------------|-------|-------|-------|-------|--------|-----------
SageMaker endpoint-name 104053 0(0.00%) | 574 192 60498 420 | 346.77 0.00
--------|----------------------------------------------------------------------------|-------|-------------|-------|-------|-------|-------|--------|-----------
Aggregated 104053 0(0.00%) | 574 192 60498 420 | 346.77 0.00

Response time percentiles (approximated)
Type Name 50% 66% 75% 80% 90% 95% 98% 99% 99.9% 99.99% 100% # reqs
--------|--------------------------------------------------------------------------------|--------|------|------|------|------|------|------|------|------|------|------|------
SageMaker endpoint-name 420 600 690 740 970 1300 1900 2700 5900 12000 60000 104053
--------|--------------------------------------------------------------------------------|--------|------|------|------|------|------|------|------|------|------|------|------
Aggregated 420 600 690 740 970 1300 1900 2700 5900 12000 60000 104053

Based on the requirement and use case you can update the endpoint to scale vertically( change type of instance) or horizontally (change number of instances).

We will discuss MLOps pipeline for sage-maker manual and automated in another blog.

--

--