Concurrent load testing for ML models deployed in Cloud Pak using Locust

Before enterprises decide to productionalize a Machine Learning model and make it available for online consumption, the model undergoes several tests. Concurrent load testing is one such test. This test concurrently loads the REST API of a deployed online model with multiple requests from multiple users simultaneously trying to request a response from the API, to stress the system and measure its response time. It is often used to identify the bottleneck of an API, basically to find out how many requests per second (RPS) it can effectively handle before its performance starts to degrade beyond acceptable limits. This helps in preventing production failures which could lead to potential loss of hundreds of millions of dollars in revenue.

To run this test effectively, we mainly need 2 python packages.

Locust

There are many open-source tools available to perform concurrency tests, but in this article, I will show how to do the test for a model deployed in IBM Cloud Pak for Data, IBM’s Data and AI platform using the Locust package. According to their documentation, they define Locust as an easy to use, scriptable and scalable performance testing tool. If your preference is python, then this is the perfect tool, as you only need to know python to use it.

Gevent

The second package we’ll use is called Gevent. Gevent is a python networking library which uses greenlet to provide high-level synchronous APIs on top of the libev or libuv event loop. It is event-based, which makes it possible for a single process to handle many thousands of concurrent users.

Installation

The first step is to install both the packages. In your terminal run the following code.

pip install gevent, locust

Locustfile

Once it is installed successfully, we then need to write a locustfile, which is basically a python file that contains all the necessary code for locust to execute and perform the load concurrency test.

Let’s go through the code step by step. The first step is to import all the necessary packages.

import gevent
from locust import HttpUser, task, between
from locust.env import Environment
from locust.stats import stats_printer, stats_history
from locust.log import setup_logging

Then using locust inbuilt logging framework, we need to setup logging.

setup_logging("INFO", None)

Next, we need to define a class to simulate multiple IBM Cloud Pak for Data (CPD) users accessing the deployment. When a test starts, locust will create an instance of this class for every user that it simulates, and each of these users will start running within their own green gevent thread.

class CPDUser(HttpUser):

As an attribute of this class, wait time is defined below, which is used to simulate how long a user must wait after each task is executed. Here, it’s defined between 0.2 to 1 second. You can also set a constant time, to simulate constant load requests.

wait_time = between(0.2, 1)

The host attribute is a URL prefix to the host that is to be load tested on. For our test, I’ve copied the url prefix of the XGBoost model in deployment.

host = "https://dse-cpd45-cluster1.cpolab.ibm.com/ml/v4/deployments/credit_risk"

You can find the url/endpoint by going to the desired deployment space in IBM Cloud Pak for Data, then clicking on the desired deployment which should open up the API Reference tab, that you see below in the screenshot. After the /deployments/ section you can either input the alphanumeric characters or the serving name (if given). To know more about the advantages of using a serving name, refer to this medium article.

Deployed model and it’s REST endpoint

Methods decorated with @task are core to your locustfile. For every user locust simulates using greenlet, this method will be called. All the tasks that need to be performed during the load testing, can be declared as a method under it using the @task decorator. The method I need to run my load test is as follows:

@task
def my_task(self):
token = ''
header = {'Content-Type': 'application/json', 'Authorization': 'Bearer ' + token}payload_scoring = {'input_data': [{'values': [['less_0', 10.0, 'prior_payments_delayed', 'furniture', 1368.0, 'less_100', '1_to_4', 2.0, 'female', 'none', 2.0, 'car_other', 24.0, 'stores', 'own', 2.0, 'skilled', 1.0, 'none', 'yes']]}]}self.client.post("/predictions?version=2022-08-12", json=payload_scoring, headers=header, verify=False)

The token element has been intentionally left blank for security purposes, but you can generate the token using the following code below. All you need to enter is your IBM Cloud Pak for Data username and password. Once the token is generated, copy paste it, in the empty string above.

# After adding each appropriate   value, hit enter to add the next value
from IPython.display import clear_output

WOS_CREDENTIALS = {}
WOS_CREDENTIALS["username"] = input("Username : ")
WOS_CREDENTIALS["password"] = input("Password : ")

import os
WOS_CREDENTIALS["url"] = os.environ['RUNTIME_ENV_APSX_URL']

import json
import requests
import base64
from requests.auth import HTTPBasicAuth
import time
import warnings
warnings.filterwarnings("ignore", message="numpy.dtype size changed")

def generate_access_token():
headers={}
headers["Accept"] = "application/json"
auth = HTTPBasicAuth(WOS_CREDENTIALS["username"], WOS_CREDENTIALS["password"])

ICP_TOKEN_URL= WOS_CREDENTIALS["url"] + "/v1/preauth/validateAuth"

response = requests.get(ICP_TOKEN_URL, headers=headers, auth=auth, verify=False)
json_data = response.json()
icp_access_token = json_data['accessToken']
return icp_access_token

token = generate_access_token()
print(token)

The rest of the code cells in the method follows the structure from the code snippet mentioned in the API reference in the deployment.

Code snippets to score the deployed model

The code to access and score a deployment is ready. All we need to do now is set up the environment and runner attributes to simulate the various concurrent load testing scenarios. First, we instantiate an environment and point it to the CPDUser class that we just created, and then create a local runner instance out of that environment.

env = Environment(user_classes=[CPDUser])
env.create_local_runner()

Define the address and port on where to create the local locust web ui instance.

env.create_web_ui("127.0.0.1", 8089)

Start a greenlet that periodically outputs and saves current stats to history. This can be seen in the UI when the test is running, and all the stats displayed will be dynamically updated.

gevent.spawn(stats_printer(env.stats))
gevent.spawn(stats_history, env.runner)

Start the test and define the number of users (peak concurrency) and spawn rate (users started/second). In the code below, we set the number of users as 10, with spawn rate being 1, meaning at the start the number of users will be 1, and it will increase at a rate of 1 user/second until it reaches the max user limit of 10.

env.runner.start(10, spawn_rate=1)

The code below defines how to start a greenlet event at a later time. In our case, we have defined it to exit the environment runner after 100 seconds. It also tells the environment to wait for the greenlet and define a way to stop the web server for good.

gevent.spawn_later(100, lambda: env.runner.quit())
env.runner.greenlet.join()
env.web_ui.stop()

Once the test completes, the final stats can be viewed and a report can also be downloaded. For the test I ran above, I got the following report. The report starts with some basic information about when the test was conducted, its duration as well as the host on which it was tested.

High level information from the locust test report

The Request and Response Time statistics, gives me the various statistics of the concurrent load test for my deployment. For a test lasting 100 seconds, it received a total of 1182 requests at a rate of 12 requests per second (RPS) and 0 failures. Average response time being 197 ms.

Since the range of response time can vary wildly, average response time alone is not enough to determine consistent performance. If the goal, say, is to have at least 90% of all responses to be serviced in less than 300ms, then the test has revealed that it is not possible, since roughly only 80 percentile of all requests are actually serviced in less than 250ms. This gives us information about the potential bottleneck, and we can address this by increasing the model copies to handle more requests at the same time.

Various statistics of the test report

These statistics are also available as a chart for better visualization.

Plot of total request per second over the duration of the test
Plot of variation of response times over the duration of the test

Conclusion

Concurrent load testing is a key stress test that needs to be performed on every AI model before it is productionalized to identify operational bottlenecks and to ensure smooth user experience. Ability to perform this test makes this process a lot smoother and quicker. Industries with critical operations mandatorily need to perform this test to prevent production failures.

Hope this article was informative and will help you set up these tests for models deployed in IBM Cloud Pak for Data. Thank you for reading and looking forward to your feedback in the comments!

--

--