Scalable Batch Inference on Fine-Tuned Model using Ray

Manish
Techsalo Infotech
Published in
4 min readAug 25, 2023

This is part 2 of the article on How to Fine Tune your LLMs and do batch Inference. In Part 1 we fine tuned base model FLTAN-T5 using ALPACA dataset and stored the fine-tuned model checkpoints. In this part we will be doing distributed batch inference at scale using Ray.

We will be using Ray AIR’s “BatchPredictor” utility for large scale batch inference.
“BatchPredictor” will take our checkpoint model which is our fine-tuned model and a predictor class object and will execute large scale batch prediction when we call the predict() fuction on the entire dataset.

We will start with importing the necessary modules for the task in hand.

from ray.train.predictor import Predictor
from ray.train.batch_predictor import BatchPredictor
from transformers import AutoTokenizer
use_gpu = True

Next we will define HuggingFaceModelPredictor Class which will extend the Ray AIR’s Predictor class to generate text for input instructions. In this class following are defined:

· The predictor takes a trained Hugging Face model, a tokenizer, and a preprocessor (which can be a function that takes raw input data and returns tokenized input data).

· from_checkpoint creates a HuggingFaceModelPredictor from a checkpoint containing a trained Hugging Face model.

· _predict_numpy generates text given input data in the form of a dictionary and returns a Pandas DataFrame with a single column "generated_output" containing the generated text.

class HuggingFaceModelPredictor(Predictor):
"""
A Ray Predictor for Hugging Face models that generates text given input data.

Args:
model (transformers.PreTrainedModel): A trained Hugging Face model.
tokenizer (Optional[transformers.PreTrainedTokenizerBase]): A tokenizer
that can tokenize input text.
preprocessor (Optional[Callable]): A function that takes raw input data
and returns tokenized input data.
use_gpu (bool): Whether to use a GPU or CPU for prediction.
"""

def __init__(
self,
model: Any,
tokenizer: Optional[Any] = None,
preprocessor: Optional[Any] = None,
use_gpu: bool = False,
) -> None:
super().__init__(preprocessor)
self.model = model
self.use_gpu = use_gpu
self.tokenizer = tokenizer

@classmethod
def from_checkpoint(
cls,
checkpoint: Any,
model_cls: Any,
*,
tokenizer: Optional[Any] = None,
use_gpu: bool = False,
**get_model_kwargs: Any,
) -> "HuggingFaceModelPredictor":
"""
Create a HuggingFaceModelPredictor from a checkpoint.

Args:
checkpoint (Any): A checkpoint containing a trained Hugging Face model.
model_cls (Any): The type of Hugging Face model to load from the checkpoint.
tokenizer (Optional[Any]): A tokenizer that can tokenize input text.
use_gpu (bool): Whether to use a GPU or CPU for prediction.
**get_model_kwargs (Any): Additional keyword arguments for loading
the Hugging Face model.

Returns:
HuggingFaceModelPredictor: A Ray Predictor for the Hugging Face model.
"""
if not tokenizer:
tokenizer = AutoTokenizer
if isinstance(tokenizer, type):
tokenizer = checkpoint.get_tokenizer(tokenizer)
return cls(
checkpoint.get_model(model_cls, **get_model_kwargs),
tokenizer=tokenizer,
preprocessor=checkpoint.get_preprocessor(),
use_gpu=use_gpu,
)

def _predict_numpy(
self,
data: Dict[str, Any],
feature_columns: Optional[List[str]] = None,
**generate_kwargs: Any,
) -> pd.DataFrame:
"""
Generates text given input data.

Args:
data (Dict[str, Any]): A dictionary of input data.
feature_columns (Optional[List[str]]): A list of feature column names
to use for prediction.
**generate_kwargs (Any): Additional keyword arguments for generating text.

Returns:
pd.DataFrame: A Pandas DataFrame with a single column "generated_output"
containing the generated text.
"""
# we get already tokenized text here because we have the tokenizer as an AIR preprocessor
if feature_columns:
data = {k: v for k, v in data.items() if k in feature_columns}

data = {
k: torch.from_numpy(v).to(device=self.model.device) for k, v in data.items()
}
generate_kwargs = {**data, **generate_kwargs}

outputs = self.model.generate(**generate_kwargs)
return pd.DataFrame(
self.tokenizer.batch_decode(outputs, skip_special_tokens=True),
columns=["generated_output"],
)

Next we will call Ray AIR’s BatchPredictor utility “from_checkpoint” and configure it to load our fine-tuned model checkpoint sored in “result” checkpoint; our predictor class above i.e. HuggingFaceModelPredictor. We also assign the model class i.e. T5ForConditionalGeneration along with the tokenizer.

predictor = BatchPredictor.from_checkpoint(
checkpoint=result.checkpoint,
predictor_cls=HuggingFaceModelPredictor,
model_cls=T5ForConditionalGeneration,
tokenizer=T5Tokenizer,
use_gpu=use_gpu,
device_map="auto",
torch_dtype=torch.float16,
)
(raylet) Spilled 2902 MiB, 8 objects, write throughput 815 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.

Next step we make the predictions by calling the predict() fuction. When calling the predict() function its important to understand that this will load the entire dataset into the cluster memory. This maybe a problem if your dataset is larger than the available cluster memory; for such a scenario use predict_pipelined() fuction instead of calling predict().
More details here — https://docs.ray.io/en/latest/ray-air/predictors.html#pipelined-prediction.

prediction = predictor.predict(
validation_dataset,
num_gpus_per_worker=int(use_gpu),
batch_size=256,
max_new_tokens=128,
)

Next we display the results with input and output side by side as a dataframe.

# Display inputs and generated outputs side by side.
input_data_pd = validation_dataset.to_pandas()
prediction_pd = prediction.to_pandas()

input_data_pd.join(prediction_pd, how='inner')

Note: We at Techsalo Infotech is a team of engineers solving complex Data engineering and Machine learning problems. Please reach out to us at sales@techsalo.com for any query on How to build these systems at scale and in cloud.

--

--