Recap DevFest Cloud Bangkok 2023 : Leveraging Ray and Vertex AI for LLMOps

Natthanan Bhukan
11 min readFeb 24, 2024

--

After introducing the Ray framework in DevFest Cloud Bangkok 2023, I would like to recap and summarise my presentation through this article. In late 2023, GCP announced a new integration between Ray and VertexAI, which GCP named Ray on VertexAI. This new service allowed Ray’s user to develop software in the VertexAI eco-system without losing the benefit of Ray. In addition, I would like to recap each part of my talk and demonstrate with Ray on VertexAI in the LLMOps part.

Table of content

  • What is Ray?
  • Benefits of using Ray
  • Why Ray on Vertex AI?
  • Demo with LLMOps
  • Bonus : Recap from Ray Summit 2023

What is Ray?

Ray is a framework for helping simplify distributed computing workloads which it allows developer users to focus on developing software to solve business problems and lets the framework handle a parallel task by using Python Native. Beside that, Ray is providing heterogeneous clusters, auto-scaling, and executor lifecycle management, so users wouldn’t need to manage cluster and scaling stuff.

Figure 1 — Ray Component

Moreover, Ray includes the Ray AI Runtime, or Ray AIR, which is a rich set of libraries that integrate into the system with the distribute computing feature. Therefore, these libraries increase productivity for the product life cycle, which could be separated into four categories.

  1. Ray dataset: Ray provides a function to load and process data with a user function by using the Ray load data function, which supports distribution processing data.
  2. Ray training: Provide a wrapper to allow developer code to be a distribution function without changing the code. As a result, developers focus on training model code without managing a distributed training system.
  3. Ray tuning: Tuning an AI/ML model has been a challenging task for a decade due to the fact that finding an optimal value for the model is a complicated task. For instance, adjusting one hyperparameter could open a new experiment, so the number of experiments would grow tremendously.
  4. Ray serving: To be an end-to-end integration Ray provides a serving library that helps users deploy AI models with this library without modifying the code since it provides a wrapper function to encapsulate your function and then serve as an API.

Benefits of using Ray

Figure 2— Ray State

Ray isn’t the first framework to solve a distributed task. There are many distributed frameworks, such as Spark, Dash, Celery, etc. Why should you consider Ray?

  1. Powerful primitive for distributed computing since Ray provides a remote task distribution and remote object store due to distributed system design.
  2. Heterogeneous clusters, auto-scaling, and executor lifecycle management are in the framework, so developers focus on developing machine learning rather than bothering with a cluster configuration.
  3. With rich, famous ML library integration, this benefit allowed users to integrate with their favourite tools. As a result, developers would be confronted with their environment.
  4. Support Kubernetes operators (KubeRay), since this framework is based on distributed computing design and development based on Kubernetes, so it allows users to develop software by using Kubernetes operators.
Figure 3— Example library that support in Ray

Why Ray on Vertex AI

Figure 4 — Ray on Vertex AI Component
  1. Cost and time savings to maintain infrastructure for Ray, since provision and maintenance of Ray infrastructure might be a tedious task if you don’t have a platform engineer or an engineer who has the knowledge to maintain it. As a result, Vertex AI provides a Ray infrastructure for you to develop your software
  2. Being a GCP product has benefits in integration with other GCP products, such as cloud logging, cloud monitoring, Big Query, and GCS. This benefit increase productivity for developing software in the GCP
  3. The library and API are the same as self-hosting Ray, since this is Ray on the Vertex AI!!

Demo with LLMOps

For this demonstration, I will show the training parts of the LLMOps which you could use these concept for continuous training (CT) in your pipeline by sending a training task into to Ray cluster from your pipeline that illustrate the diagram below.

Figure 5 — Train LLM model

For the LLM model, we will not train it but fine-tune it since the LLM model has a large model architect with huge training data. As a result, building a train model from scratch won’t be a good idea. This article will select a foundation model and fine-tune it using custom data by using Ray.

  • Load dataset: Load data from Hugging Face by using Ray Dataset to load.
  • Preprocess dataset: using the Ray dataset function to tokenize data for the preprocessing process.
  • Fine-tune model: Using the Ray train function with the train function from Hugging Face to create a fine-tune foundation model.
  • Tune model: Ray provides a tuning function for hyperparameter tuning the model.

Setup Ray cluster on Ray on VertexAI

Figure — 6 Create cluster page

By searching for “Ray on VertexAI” and selecting the desired region, then “CREATE CLUSTER” it will pop up a page for creating one cluster. On the create page, add the cluster’s name and metadata.

Figure — 7 Cluster config page

For compute setting, we will be able to configure a compute setting, which, in the case I select A100 for 2 GPUs, so you may wonder why I need a lot of GPU. This is normal for fine-tuning models, which is why I don’t recommend training the foundation model from scratch. If you do it from scratch, you will lose the context of the training data, which may not be beneficial for your application. After that, we need to set up a network with private service access to allow the cluster to access GCP services.

When a cluster is provisioned, we could use a Google Colab Enterprise to access your cluster.

Figure — 8 Access through Google Colab Enterprise

Setup dependency and connection

First, we need to setup a dependency and cluster connection to send a job from local to cluster, which you could follow the script below

%pip install peft==0.5.0
%pip install datasets==2.12.0 bitsandbytes==0.40.0 einops==0.6.1 trl==0.4.7
%pip install torch==2.1.0 accelerate==0.21.0 tokenizers transformers==4.38.1
%pip install pyarrow==8.0.0
%pip install protobuf==3.20.0
%pip install xformers==0.0.22

Next, we need to import the required dependency and connect with the Vertex Library.

import ray
import torch
import transformers
import warnings
import numpy as np
from huggingface_hub import login
from datasets import load_dataset
from typing import Any, Dict, List, Optional
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, TrainingArguments, Trainer
from peft import prepare_model_for_kbit_training, LoraConfig, get_peft_model
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster, MAX_NUM_WORKER_NODES
from ray.data.preprocessors import BatchMapper
from google.cloud import aiplatform
from google.cloud.aiplatform.preview import vertex_ray

transformers.set_seed(42)
warnings.simplefilter("ignore")
aiplatform.init(project='<PROJECT_ID>', location='us-central1')

After that, we need to initialise the cluster connection to allow Ray to send a task to the cluster, so we could use the Ray cluster resource for processing our task without inheriting your local machine. Moreover, this connection could install a dependency or set up an environment variable in our cluster.

# Initialize connection to the Ray on Vertex AI Cluster
if ray.is_initialized():
ray.shutdown()

ray.init(
address='vertex_ray://projects/311317147984/locations/us-central1/persistentResources/cluster-20240224-082335',
runtime_env={
"pip": [
"transformers==4.38.1",
"tokenizers==0.15.1",
"torch==2.0.1",
"peft==0.5.0",
"datasets==2.12.0",
"bitsandbytes==0.40.0",
"einops==0.6.1",
"trl==0.4.7",
"accelerate==0.21.0",
"pyarrow==8.0.0",
"protobuf==3.20.0",
"xformers==0.0.22",
"ipython"
],
"env_vars": {
"PYTORCH_CUDA_ALLOC_CONF": "max_split_size_mb:512"
}
}
)

Load and prepare dataset

In this demo, I will use the metal health counselling conversion from Hugging Faces as fine-tuned data. In this step, we could download a dataset by using the Hugging Face library, which is the same as Hugging Face’s document.

dataset_name = "Amod/mental_health_counseling_conversations"
hf_dataset = load_dataset(dataset_name)

Next, we need to create a preprocess function to fine-tune a LLM model, which requires two processes.

  1. Format into prompt template
  2. Tokenize a template

For the prompt template, I created a prompt template for a mental health case, which is shown in the code below.

INTRO_BLURB = "Below is a question and response about mental health question. Write a response that appropriately completes the request."
QUESTION_KEY = "<start_of_turn>user"
END_USER_KEY = "<end_of_turn>"
RESPONSE_KEY = "<start_of_turn>model"

PROMPT_FORMAT = """{intro}

{question_key}
{question}{end_key}

{response_key}
{response}
""".format(
intro=INTRO_BLURB,
question_key=QUESTION_KEY,
question="{question}",
response_key=RESPONSE_KEY,
response="{response}",
end_key=END_USER_KEY
)


def apply_prompt_template(data):
question = data["Context"]
response = data["Response"]

full_prompt = PROMPT_FORMAT.format(question=question, response=response)

return { "text": full_prompt }

def generate_prompt(prompt):
return apply_prompt_template(prompt)

Next, we need to tokenize a prompt by using the Hugging Face function and Ray. For tokenizers, I will use Gemma-2b-it as a tokenizer and fine-tune model. As a result, we may create a custom function and then use the BatchMapper function from the Ray Dataset to process the data in the training pipeline.

model_name = "google/gemma-2b-it"
hf_token = '<HUGGING_FACE_TOKEN>'
max_length = 2048
batch_size = 4096

login(token=hf_token)

tokenizer = AutoTokenizer.from_pretrained(
model_name,
trust_remote_code=True,
padding_side="left",
add_eos_token=True,
add_bos_token=True
)
tokenizer.pad_token = tokenizer.eos_token

def preprocess_function(batch: Dict[str, Any]) -> Dict[str, Any]:
result = tokenizer(
list(batch["text"]),
max_length=max_length,
truncation=True,
padding="max_length",
return_tensors="np",
)

result["labels"] = result["input_ids"].copy()

return dict(result)

batch_preprocessor = BatchMapper(preprocess_function, batch_format="pandas", batch_size=batch_size)

ray_train_dataset = ray.data.from_huggingface(hf_train_dataset)

Fine-tune model

To fine-tune Gemma-2b-it, we could follow a document from Hugging Face to start training an LLM model. Moreover, integration with Ray requires writing a function that is compatible with Ray, which you could follow through the Ray document, as shown in the code below.

def trainer_init_per_worker(
train_dataset: ray.data.Dataset,
eval_dataset: Optional[ray.data.Dataset] = None,
**config,
) -> Trainer:
device = torch.device("cuda")

login(token=config.get("hf_token"))

tokenizer = AutoTokenizer.from_pretrained(
pretrained_model_name_or_path=config.get("model_name"),
trust_remote_code=True,
padding_side="left",
add_eos_token=True,
add_bos_token=True
)
tokenizer.pad_token = tokenizer.eos_token

num_gpus = torch.cuda.device_count()
max_memory = config.get("max_memory")

bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.bfloat16
)

model = AutoModelForCausalLM.from_pretrained(
pretrained_model_name_or_path=config.get("model_name"),
device_map={'':torch.cuda.current_device()}, # fix for >3 GPUs
quantization_config=bnb_config,
max_memory={i: max_memory for i in range(num_gpus)},
trust_remote_code=True
)

model.gradient_checkpointing_enable()
model = prepare_model_for_kbit_training(model)

peft_config = LoraConfig(
lora_alpha=config.get("lora_alpha", 16),
lora_dropout=config.get("lora_dropout", 0.1),
r=config.get("lora_r", 64),
bias="none",
task_type="CAUSAL_LM",
target_modules=[
"q_proj",
"k_proj",
"v_proj",
"o_proj",
"gate_proj",
"up_proj",
"down_proj",
"lm_head",
] # Choose all linear layers from the model
)

model = get_peft_model(model, peft_config)

training_arguments = TrainingArguments(
output_dir="checkpoints",
per_device_train_batch_size=config.get("per_device_train_batch_size", 1),
gradient_accumulation_steps=config.get("gradient_accumulation_steps", 1),
optim="paged_adamw_8bit",
save_steps=config.get("save_steps", 50),
logging_steps=config.get("logging_steps", 50),
learning_rate=config.get("learning_rate", 2e-4),
bf16=False,
fp16=True,
max_grad_norm=0.3,
max_steps=config.get("max_steps", 100),
weight_decay=config.get("weight_decay", 0.001),
logging_strategy="steps",
save_strategy="steps",
warmup_ratio=config.get("warmup_ratio", 0.03),
group_by_length=False,
lr_scheduler_type=config.get("lr_scheduler_type", "constant"),
ddp_find_unused_parameters=False,
push_to_hub=False,
disable_tqdm=False,
)

trainer = Trainer(
model=model,
train_dataset=train_dataset,
args=training_arguments,
tokenizer=tokenizer,
)
model.config.use_cache = False

return trainer

After preparing training code, we need to configure a Ray train, such as the dataset, train function, preprocess function, and scaling configuration.

from ray.air.config import RunConfig, ScalingConfig, CheckpointConfig
from ray.train.huggingface import HuggingFaceTrainer

trainer = HuggingFaceTrainer(
trainer_init_per_worker=trainer_init_per_worker,
trainer_init_config={
"model_name": model_name,
"hf_token": hf_token,
"max_memory": max_memory,
"logging_steps": logging_steps,
"save_steps" : save_steps,
"max_steps": max_steps
},
scaling_config=ScalingConfig(
num_workers=num_workers,
use_gpu=use_gpu,
resources_per_worker={"GPU": num_gpu_per_worker, "CPU": resource_per_worker_int}
),
datasets={
"train": ray_train_dataset
},
run_config=RunConfig(
checkpoint_config=CheckpointConfig(
num_to_keep=1,
checkpoint_score_attribute="loss",
checkpoint_score_order="min",
),
verbose=0
),
preprocessor=batch_preprocessor
)


After that, we could fine-tune model by using .fit() function

result = trainer.fit()
Figure 9— Test fine tune model

Hyperparameter tuning

Next, to tune a hyperaprameter, we could use a package from Ray, which provides the same interface as Ray Train, as illustrated in the code below.

from ray import tune
from ray.tune import Tuner
from ray.tune.schedulers.async_hyperband import ASHAScheduler

total_num_trials = 4
max_tune_steps = 20

tuner = Tuner(
trainer,
param_space={
"trainer_init_config": {
"learning_rate": tune.choice([1e-5, 1e-4, 1e-3, 1e-2]),
"max_steps": tune.choice([5, 10, 15, max_tune_steps]),
}
},
tune_config=tune.TuneConfig(
metric="loss",
mode="min",
num_samples=total_num_trials,
scheduler=ASHAScheduler(
max_t=max_tune_steps,
),
),
run_config=RunConfig(
checkpoint_config=CheckpointConfig(
num_to_keep=1,
checkpoint_score_attribute="loss",
checkpoint_score_order="min",
)
),
)

Using same method as Ray train

tune_results = tuner.fit()

And we could get a result by using this script

tune_results_df = tune_results.get_dataframe().sort_values("loss")
best_result = tune_results.get_best_result()

print(best_result.checkpoint)
print(best_result.metrics)

Bonus : Recap from Ray Summit 2023

Bonus!! In September 2023, Ray arranged an event called Ray Summit 2023, which invited amazing speakers around the world, and I would like to share some interesting sessions.

Ray LLM

Figure 10— iRay LLM from Any Scale Team — https://www.youtube.com/watch?v=TJ5K1CO9Wbs

Ray-LLM: Behind the screens of top-tier companies such as OpenAI, Uber, and Cohere, which help deploy LLM models, is Anyscale. This platform allowed developers to serve LLM models, and it was developed by the Ray Team, but I would like to share some details about their tool, Ray-LLM. This tool solves a technical problem when serving the LLM model, which could be divided into three issues.

  • Auto-scaling and serving the right GPU:Managing a workload or node for auto-scaling is a tedious task, but Ray could solve this task easily and select the right GPU for the model due to the fact that serving multiple models with a variety of sizes using a fixed size of GPU wouldn’t be a good idea.
Figure 11— Ray LLM from Any Scale Team — https://www.youtube.com/watch?v=TJ5K1CO9Wbs
  • Continuous batching: GPUs are underutilised with native batching with different word sequences, but this method will concatenate new input sequence tokens into the end of the token batch to fill the batch sequence, which increases the throughput of the system.
Figure 12 — Speculative decoding
  • Speculative decoding: This method uses a small model to speculate the K token ahead and then a large model to verify, and it will emit a token if the token isn’t correct. These methods allow for faster forward passes per token, which reduces latency since large models only verify and can do it in parallel.

Hybrid routing

Figure 13 — Developing and Serving RAG-Based LLM in Production — https://www.youtube.com/watch?v=YO9jYy-HIRY

Another interesting technique is using a supervised classifier to classify queries that feed into the model and then select a suitable model before feeding into the LLM. This method helps to create agents based on the LLM since each LLM has its own specificity for each task, so it could be a better idea to let the model that has more context answer it.

Summary

In this article, I provide a benefit by using Ray with AI/ML projects, which you could apply to the LLMOps concept or MLOps since Ray gives flexibility to the developer to allow integration between framework and encapsulate your function as a task to run in the cluster as a distribute computed function. Although I only demonstrate the train model part of the LLMOps lifecycle, in the following article I’d like to discuss how to deploy a LLM model into production using Ray or BentoML with vLLM.

Reference

--

--