Scalable Batch Inference on Large Language Models Using Ray

Büşra Korkmaz
KoçDigital
Published in
9 min readOct 31, 2023

Introduction

Large language models (LLMs) have become very popular in recent years. To use these models effectively, we need to integrate current technologies. At Koc Digital, we have been using the Ray library in our projects since 2019 and achieving significant performance improvements. In this article we co-authored with Murat Koc, we explore the batch inference performance of an LLM model with a Ray library. We will show how we can do this using Ray Data, one of the Ray AI Runtime libraries. For this, we will create a Ray cluster to work on. We will compare two types of batch inference applications: sequential and distributed. In this study, we use the GP2-Large model for text-generation prediction. The data we use consists of repeated questions with 6480 lines.

In this post, we will discover the following topics:

  • Inference with Ray AIR Datasets.
  • Get to know the inference task.
  • Implement sequential inference.
  • Learn about three distributed batch inference design patterns with Ray.
  • Implement distributed inference patterns.

1.Scaling Batch Inference with Ray

What is batch inference?

Batch Inference Diagram

Batch inference, also known as offline inference, is the process of generating predictions about a set of observations. Estimates of batches, usually created according to recurring schedules, are stored in a database. This data can be made available to developers. For example, it can be processed and new predictions can be created using big data technologies.

Batch inference using Ray AIR Data

Ray Data is a utility for large-scale, distributed or sequential batch inference. Ray Data supports various predictors like TorchPredictor, HuggingFacePredictor or TFPredictor.

2.Prepare Environment

We ran the following scripts to create the Ray cluster.

ray stop # If there is an active cluster, to stop it
export RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING=1 # To view worker logs of actor classes in detail
export CUDA_VISIBLE_DEVICES=2,3,5 # To determine the GPUs that the Ray cluster will run on (we have 6 GPUs, we chose a few of them)
ray start --head --num-cpus=30 --num-gpus=3 --port=8266 --dashboard-port=8267 # To run the ray cluster with 3 gpu and 30 cpu

We used the following script to submit the following python codes to the ray cluster. (ray_batch_inf.py is the file name we created)

RAY_ADDRESS='http://127.0.0.1:8267' ray job submit --working-dir . -- python ray_batch_inf.py

Ray version: 2.7.1

Python version: 3.10.4

Using the ray status script, we displayed the cluster information we created as follows.

Ray Cluster Info

3. Sequential Batch Inference

To begin with this batch inference task, we will take a basic approach with a worker sequentially generating predictions on groups. We will work with a single worker and process all the data by dividing it into batches and see how it uses time.

We carried out two studies under the title of Sequential Batch Inference. In these studies, we observed the difference between GPU and CPU usage when performing sequential batch inference. You can change the estimation time by reducing data size with take_batch parameter in the map_batches function, or by increasing or decreasing batch_size parameter.

from typing import Dict
import numpy as np
import time
import ray
from transformers import pipeline

ds = ray.data.read_text("data.txt")

class HuggingFacePredictor:

def __init__(self):

start = time.time()
# We added the device="cuda" parameter to the function for GPU usage.
self.model = pipeline("text-generation", model="gpt2-large",device="cuda")
end = time.time()

print("--------- MODEL LOADING TIME -------")
print(end-start)

def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
print("CALL METHOD")
start = time.time()
predictions = self.model(list(batch["text"]), max_length=20, num_return_sequences=1)
batch["output"] = [sequences[0]["generated_text"] for sequences in predictions]
end = time.time()

print("--------- MODEL INFERENCE TIME -------")
print(end-start)
return batch

start = time.time()

predictions = ds.map_batches(
HuggingFacePredictor,
num_gpus=1, # GPU capacity of each worker
#num_cpus=24 # # CPU capacity of each worker
batch_size=250, # We will make predictions by dividing 6480 data by 250.
compute=ray.data.ActorPoolStrategy(size=1), # The number of Worker. Since it is sequential, we set it to run 1 worker.
).take_batch(6480) ## The number of data we want to predict in the dataset

end = time.time()

print("--------------- TOTAL TIME -----------------")
print(end-start)

print(predictions) print("--------------- TOTAL TIME -----------------")
print(predictions)
print(end-start)

Senario 1: 1 Worker & 1 GPU & 6480 Data & 250 Batch Size

As can be seen in the codes, by adding device=”cuda” parameter to the pipeline function, we enabled it to use GPU while making the prediction. For this purpose, we also determined the gpu usage capacity of the workers by adding the num_gpus parameter to the map_batches function. For Sequential Batch inference, we made the estimation with 1 worker.

When we view the Cluster status via the Cluster Tab of the Ray Dashboard, we can see that 1 GPU is used as follows.

GPU Usage

We can see the running workers and their status through the Actor Tab of the Ray Dashboard as follows. We can also track the log separately for each worker from here.

By using the nvidia-smi script on the machine, we can view the worker running on the machine and which GPU it is using. Since we give 1 GPU capacity to 1 Worker, we get an output like the following.

The estimation time of a worker with 1 GPU capacity working with 250 batches on 6480 data for text generation with the GPT2-Large model takes approximately 23 minutes as follows. In the next scenario, we will see the speed difference created by using CPU instead of GPU.

Senario 2: 1 Worker & 24 CPU & 6480 Data & 250 Batch Size

When we remove device=”cuda” parameter in the pipeline function that creates the model and use the num_cpus parameter instead of num_gpus in the map_batches function, we disable the use of GPUs. When we examine the cluster tab of Ray Dasboard by giving 24 CPU capacity to 1 worker via CPU, we will see that the CPU usage is almost 100% as follows. We can also view the status and logs of running workers via the Actor tab as above.

CPU Usage

Since this job does not run on the GPU, we used the ps -ef | grep ray script to display the worker as follows.

The estimation time of 1 worker with 24 CPU capacity working with 250 batches on 6480 data for text generation with the GPT2-Large model takes approximately 5 hours as follows. In the next steps, we will simulate distributed batch inference scenarios.

4. Distributed Batch Inference

It allows you to perform batch inference by copying the existing model on multiple workers.

We have completed two studies under the title Distributed Batch Inference. We clearly observed the difference in speed of estimation when Sequential or Distributed was run in these scenarios. We also examined the differences between GPU and CPU usage when performing distributed batch inference. You can change the estimation time by reducing data size with the take_batch parameter in the map_batches function or by increasing or decreasing batch_size parameter. In addition, you can increase or decrease the number of workers by changing size value of compute parameter in the map_batches function.

from typing import Dict
import numpy as np
import time
import ray
from transformers import pipeline

ds = ray.data.read_text("data.txt")

class HuggingFacePredictor:

def __init__(self):

start = time.time()
# We added the device="cuda" parameter to the function for GPU usage.
self.model = pipeline("text-generation", model="gpt2-large", device="cuda" )
end = time.time()

print("--------- MODEL LOADING TIME -------")
print(end-start)

def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
print("CALL METHOD")
start = time.time()
predictions = self.model(list(batch["text"]), max_length=20, num_return_sequences=1)
batch["output"] = [sequences[0]["generated_text"] for sequences in predictions]
end = time.time()

print("--------- MODEL INFERENCE TIME -------")
print(end-start)
return batch

start = time.time()

predictions = ds.map_batches(
HuggingFacePredictor,
num_gpus=0.5, # GPU capacity of each worker
#num_cpus=1, # CPU capacity of each worker
batch_size=250, # We will make predictions by dividing 6480 data by 250.
compute=ray.data.ActorPoolStrategy(size=6),# The number of Worker. Since it is distributed, we set it to run 6 worker.
).take_batch(6480) ## The number of data we want to predict in the dataset

end = time.time()

print("--------------- TOTAL TIME -----------------")
print(end-start)

print(predictions) print("--------------- TOTAL TIME -----------------")
print(predictions)
print(end-start)

Senario 3: 6 Worker & 0.5 GPU & 6480 Data & 250 Batch Size

To speed up the first scenario, we performed distributed batch inference by increasing the number of workers in this scenario. Thus, we accelerated the time in the first scenario even more. In the Cluster tab of the Ray Dashboard, we can see that all 3 GPUs are used at full capacity, as seen below. We can review all the workers we run and their status under the Actor Tab.

By using nvidia-smi script on the machine, we can view how the 6 workers running on the machine and how they are positioned on the GPUs. Since we give 0.5 gpu capacity to 6 Workers, we get an output as follows.

The estimation time of our 6 workers with 0.5 gpu capacity, working with 250 batches on 6480 data for text generation with the GPT2-Large model, takes approximately 6 minutes as follows. In the next scenario we will examine the usage of distributed batch inference on the CPU.

Senario 4: 6 Worker & 4 CPU & 6480 Data & 250 Batch Size

When we remove device=”cuda” parameter in the pipeline function that creates the model and use the num_cpus parameter instead of num_gpus in the map_batches function, we inactivate the use of GPUs. When we examine the cluster tab of Ray Dasboard by giving 4 cpu capacity to 6 workers on cpu, we will see that CPU utilization is almost 100% as below. We can also view the status and logs of the running workers on the Actor tab as above.

Since this job does not run on the GPU, we used the ps -ef | grep ray script to display workers as follows.

The estimation time of our 6 workers with 4 cpu capacity, working with 250 batches on 6480 data for text generation with the GPT2-Large model, takes approximately 1 hour as follows.

Don’t forget to shutdown ray.

ray.shutdown()

Conclusion

Batch inference is the process of applying a trained model to a large dataset offline. In this post, we use the Ray Data library, part of the Ray AI Runtime, to perform batch inference with a large language model (LLM). We compare the performance of two different batch inference methods: sequential and distributed. We also examine how different parameters, such as GPU and CPU usage, affect the inference speed. We find that Ray Data offers a flexible and scalable solution for offline batch inference tasks.

--

--