MLOps with Kubeflow-pipeline V2, mlflow, Seldon Core: Part 2
This is second part of the four parts MLOps series.
Part 1: Introduction to the basic concepts and installation on local system.
Part 2: Understanding the kubeflow pipeline and components.
Part 3: Understanding the Mlflow server UI for logging parameters, code versions, metrics, and output files.
Part 4: Deploying model with Seldon core server over kubernetes.
Explanation of the Kubeflow pipeline’s each Component.
As you can see that there are five main components in this pipeline starting with the dataset-download. Each component in a pipeline executes independently. In the pipeline the components can be made to execute sequentially . A component may produce output artifact to be consumed by the other component in the pipeline. We will see this later in this tutorial
src
└── components
├── data_download
├── model_train_cnn
├── model_inference
├── register_model
├── model_eval ( predict-on-sample-image)
Note:- For simplicity, I will only add the snippet of this long code. I will share the git repository link later in the tutorial.
- data-download
In this component we are simply downloading our dataset from a remote url, unzipping the content of the zip in a directory mounted by local PVC, no fancy stuff. PVC for the pipeline is created at the time of the pipeline run and is deleted once the pipeline is finished.
For the dataset I am using data provided in below course. This dataset provides total of 300 images divided into train and test folders with 3 output classes Pizza, Steak and Sushi.
https://github.com/mrdbourke/pytorch-deep-learning/raw/main/data/pizza_steak_sushi.zip"
%%writefile src/components/data_download/data_download_component.py
from kfp import dsl
@dsl.component(base_image='python:3.10-slim',
target_image='mohitverma1688/data-download_component:v0.2',
packages_to_install=['pathlib','boto3','requests','kfp-kubernetes'])
def dataset_download(url: str, base_path:str,
):
import os
import requests
import zipfile
from pathlib import Path
# Save zip files to S3 import_bucket
data_path = Path(base_path)
if data_path.is_dir():
print(f"{data_path} directory exists.")
else:
print(f"Did not find {data_path} directory, creating one...")
data_path.mkdir(parents=True,exist_ok=True)
# Download pizza , steak and sushi data and upload to the s3 bucket. This is example code to save the downloaded data to the bucket
with open(f"{data_path}/data.zip", "wb") as f:
request = requests.get(f"{url}")
print(f"Downloading data from {url}...")
f.write(request.content)
# unzip the data to use the data in the next step. Data will be stored in PVC.
with zipfile.ZipFile(data_path/"data.zip", "r") as zip_ref:
print("Unzipping data...")
zip_ref.extractall(data_path)
- model_train
In this component, dataset is loaded, model is created and then model is trained on the training dataset with the helper python files in the directory. The main feature of the pipeline is the input and output artifacts. In this component there are three main output artifacts as show in the figure model_train step.
@dsl.component(base_image='mohitverma1688/model_train_component:v0.1',
target_image='mohitverma1688/model_train_component:v0.1',
packages_to_install=['pandas','matplotlib']
)
def model_train(num_epochs:int,
batch_size:int,
hidden_units:int,
learning_rate: float,
train_dir: str,
test_dir: str,
model_name: str,
model_dir: str,
model_artifact_path: OutputPath('Model'),
parameters_json_path: OutputPath('Artifact'),
train_loss: Output[HTML],
export_bucket: str = "modelbucket",
) -> Dict[str, list] :
....
# Plot train loss in the kfp pipeline for visialization.
tmpfile = BytesIO()
plt.figure(figsize=(15,10))
plt.subplot(2,2,1)
plt.plot(epochs, df["train_loss"], label={model_name})
plt.title("Train Loss")
plt.xlabel("Epochs")
plt.legend()
plt.savefig(tmpfile, format="png")
tmpfile.seek(0)
encoded = base64.b64encode(tmpfile.read()).decode("utf-8")
train_loss.path = f"{train_loss.path}.html"
html = f"<img src='data:image/png;base64,{encoded}'>"
with open(train_loss.path, 'w') as f:
f.write(html)
....
# Saving the model as kfp output artifcat.
torch.save(model.state_dict(),
model_artifact_path)
# Below steps logs the parameters to be used in the next step to be used by mlflow server
with open(parameters_json_path, 'w') as f:
json.dump({'lr': learning_rate, 'batch_size': batch_size, 'epochs': num_epochs}, f)
Now we will explore the output artifacts train_loss which is special as this is an html visualisation in the pipeline. Also there are 2 other output artifacts, one is the model_artifact_path ( location to save the model file) which will be consumed by the next component to load the model state dict. parameter_json_path is used to register the parameters which are needed by the mlflow model logging step.
- model_inference
This component is to load the model and run inference on the test dataset in the dataset downloaded from the url. The main highlight of this step is the consumption of the Input Artificat from the previous step. This step also produces two outputs artifacts test_loss and metrics for the run as shown in the figure 2. Also notice the Output Parameters block of this step, which provides the inference results. Just like the artifacts, these parameters can also be used by the next component in the pipeline.
@dsl.component(base_image='mohitverma1688/model_train_component:v0.1',
target_image='mohitverma1688/model_inference_component:v0.1',
packages_to_install=['pandas','matplotlib']
)
def model_inference(model_artifact_path: InputPath('Model'),
num_epochs: int,
batch_size:int,
learning_rate: float,
train_dir: str,
test_dir: str,
model_name: str,
test_loss: Output[HTML],
metrics_json_path: OutputPath('Artifact')
) -> Dict[str, str]:
...
# Set up device
device = "cuda" if torch.cuda.is_available() else "cpu"
# Load the model
model = model_builder.TinyVGG(input_shape=3,
hidden_units=10,
output_shape=3).to(device)
model.load_state_dict(torch.load(model_artifact_path))
...
# Below output parameters will be used by the mlflow to log the model in the next step.
with open(metrics_json_path, 'w') as f:
json.dump({'accuracy': test_acc_last_epoch, 'loss': test_loss_last_epoch}, f)
return {"model_name" : model_name,
"test_acc_last_epoch": test_acc_last_epoch,
"test_loss_last_epoch": test_loss_last_epoch}
- register_model
In this component, we are consuming the input artifcats from the previous components. Next we are logging the experiment details and the model in the mlflow server. The mlflow default artifacts storage is s3 minio bucket which was installed as part of kubeflow installation. This pipeline step also produces the output parameters for providing the location of the registered model in the mlflow server minio s3 artifact store, which will be used in the next component of the pipeline. I will explain the mlflow server in part 3.
@dsl.component(base_image='mohitverma1688/model_train_component:v0.1',
target_image='mohitverma1688/register_model_component:v0.1',
packages_to_install=['mlflow','GitPython','numpy']
)
def register_model(parameters_json_path: InputPath('Artifact'),
metrics_json_path: InputPath('Artifact'),
model_artifact_path: InputPath('Model'),
experiment_name: str,
aws_access_key_id: str,
aws_secret_access_key: str,
) -> dict:
...
# Load the parameters and metrics for mlflow to register.
with open(metrics_json_path) as f:
metrics = json.load(f)
print(metrics)
with open(parameters_json_path) as f:
parameters = json.load(f)
print(parameters)
...
#Define the mlflow tracking URI and experiment name
tracking_uri = 'http://my-mlflow.kubeflow:5000'
experiment_name = experiment_name
mlflow.tracking.set_tracking_uri(tracking_uri)
mlflow.set_experiment(experiment_name)
reg_model_name = "CNN-TinyVG-Model"
with mlflow.start_run() as run:
for metric in metrics:
mlflow.log_metric(metric, metrics[metric])
mlflow.log_params(parameters)
artifact_path = "CNN-TinyVG"
mlflow.pytorch.log_model(model,
registered_model_name=reg_model_name,
signature=signature,
artifact_path=artifact_path,
code_paths=['/tmp/model_builder.py'])
...
return {"artifact_path": artifact_path, "artifact_uri": run.info.artifact_uri}
- model_eval
This step is not mandatory to add, but I have added this step to test the model registered in the mlflow server and predict the class on the sample image. This steps in the pipeline highlights the usage of NamedTuple for multiple output parameters (model_uri and pred_label_class) in the pipeline. Also you will see next how the previous output parameters (artifact_path & artifact_uri) of the component register_model will be used in this step from the pipeline. The output parameter model_uri has absolute path of the model in the s3 bucket named modeloutputs.
ef predict_on_sample_image(test_dir: str,
model_info: dict,
image_path: str,
aws_access_key_id: str,
aws_secret_access_key: str ,
) -> NamedTuple('outputs', [('model_uri', str),('pred_label_class', str)]):
...
artifact_path = model_info["artifact_path"]
artifact_uri = model_info["artifact_uri"]
# Loading the model from the mlFlow artifact repository
mlflow.set_tracking_uri("http://my-mlflow.kubeflow:5000")
model_uri = f"{artifact_uri}/{artifact_path}"
model = mlflow.pytorch.load_model(model_uri)
...
# Predict on image
model.eval()
with torch.inference_mode():
# Put image on the model
image = image.to(device)
# Get the pred_logits
pred_logits = model(image.unsqueeze(dim=0)) # Adding a new dimension for the batch size.
# Get the pred probs
pred_prob = torch.softmax(pred_logits, dim=1)
# Get the pred_labels
pred_label = torch.argmax(pred_prob, dim=1)
pred_label_class = class_names[pred_label]
print(f"[INFO] Pred class: {pred_label_class}, Pred_prob: {pred_prob.max():.3f}")
pred_prob_max = pred_prob.max()
print(type(pred_prob_max))
outputs = NamedTuple("outputs", model_uri=str, pred_label_class=str)
return outputs(model_uri, pred_label_class)
The complete pipeline :
Below you can see how to use kfp sdk to stitch all the components and then compile them into a kubeflow-demo2.yaml. We can also use the kfp sdk to run the pipeline from the cli as shown in the code. Main thing to note here is the creation of the pvc and deletion of the pvc after the pipeline run. Also, see how the tasks are defined to run sequentially using .after syntax. The bold part in the pipeline defines the connections between pipeline as defined in the components.
The option set_caching_options can be set to cache the results of the pipeline steps. This can help if you need to run a specific component in the subsequent run .
%%writefile pipeline.py
from kfp import kubernetes
from kubernetes import client, config
import base64
from kfp import dsl
from kfp import compiler
from src.components.data_download.data_download_component import dataset_download
from src.components.model_train_cnn.model_train_component import model_train
from src.components.model_inference.model_inference_component import model_inference
from src.components.register_model.register_model_component import register_model
from src.components.model_eval.model_eval_component import predict_on_sample_image
BASE_PATH="/data"
URL="https://github.com/mrdbourke/pytorch-deep-learning/raw/main/data/pizza_steak_sushi.zip"
NUM_EPOCHS=10
BATCH_SIZE = 32
HIDDEN_UNITS = 10
LEARNING_RATE = 0.001
MODEL_NAME = "cnn_tinyvg_v1"
MODEL_DIR = "/data/models"
EXPORT_BUCKET = "modeloutput"
TRAIN_DIR = "/data/train"
TEST_DIR = "/data/test"
IMAGE_PATH = "/data/test/pizza/1152100.jpg"
EXPERIMENT_NAME = "CNN-TinyVG-Demo-exp1"
@dsl.pipeline(name='CNN-TinyVG-Demo',
description='This pipeline is a demo for training,evaluating and deploying Convutional Neural network',
display_name='Kubeflow-MlFLow-Demo')
def kubeflow_pipeline(base_path: str = BASE_PATH,
url:str = URL,
batch_size:int = BATCH_SIZE,
train_dir:str = TRAIN_DIR,
test_dir:str = TEST_DIR,
num_epochs: int = NUM_EPOCHS,
hidden_units:int = HIDDEN_UNITS,
learning_rate:float = LEARNING_RATE,
model_name: str = MODEL_NAME,
model_dir: str = MODEL_DIR,
export_bucket: str = EXPORT_BUCKET,
image_path: str = IMAGE_PATH,
experiment_name: str = EXPERIMENT_NAME
):
# Load Kubernetes configuration
config.load_kube_config()
# Fetch the Minio credentials from the secret
secret_name = "minio-credentials"
secret_namespace = "kubeflow"
secret_key_id = "AWS_ACCESS_KEY_ID"
secret_key_access = "AWS_SECRET_ACCESS_KEY"
v1 = client.CoreV1Api()
secret = v1.read_namespaced_secret(secret_name, namespace=secret_namespace)
# Convert bytes to string
aws_access_key_id = base64.b64decode(secret.data[secret_key_id]).decode('utf-8')
aws_secret_access_key = base64.b64decode(secret.data[secret_key_access]).decode('utf-8')
pvc1 = kubernetes.CreatePVC(
# can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC
pvc_name='kubeflow-pvc1',
access_modes=['ReadWriteOnce'],
size='500Mi',
storage_class_name='standard',
)
task1 = dataset_download(base_path=base_path,
url=url)
task1.set_caching_options(True)
task2 = model_train(batch_size=batch_size,
num_epochs=num_epochs,
train_dir=train_dir,
test_dir=test_dir,
hidden_units=hidden_units,
learning_rate=learning_rate,
model_name=model_name,
model_dir=model_dir,
export_bucket=export_bucket,
).after(task1)
task2.set_caching_options(True)
task3 = model_inference(test_dir=test_dir,
model_artifact_path=task2.outputs["model_artifact_path"],
train_dir=train_dir,
learning_rate=learning_rate,
batch_size=batch_size,
num_epochs=num_epochs,
model_name=model_name
).after(task2)
task3.set_caching_options(False)
task4 = register_model(model_artifact_path=task2.outputs["model_artifact_path"],
parameters_json_path=task2.outputs["parameters_json_path"],
metrics_json_path=task3.outputs["metrics_json_path"],
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
experiment_name=experiment_name).after(task3)
task4.set_caching_options(False)
task5 = predict_on_sample_image(test_dir=test_dir,
image_path=image_path,
model_info=task4.output,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key).after(task4)
task5.set_caching_options(False)
kubernetes.mount_pvc(
task1,
pvc_name=pvc1.outputs['name'],
mount_path='/data',
)
kubernetes.mount_pvc(
task2,
pvc_name=pvc1.outputs['name'],
mount_path='/data',
)
kubernetes.mount_pvc(
task3,
pvc_name=pvc1.outputs['name'],
mount_path='/data',
)
kubernetes.mount_pvc(
task4,
pvc_name=pvc1.outputs['name'],
mount_path='/data',
)
kubernetes.mount_pvc(
task5,
pvc_name=pvc1.outputs['name'],
mount_path='/data',
)
delete_pvc1 = kubernetes.DeletePVC(pvc_name=pvc1.outputs['name']).after(task5)
compiler.Compiler().compile(kubeflow_pipeline, 'kubeflow-demo2.yaml')
# to compile pipeline run
➜ ~ python3 pipeline.py
# to run the pipeline
%%writefile run.py
from kfp.client import Client
client = Client(host='http://localhost:8002')
run = client.create_run_from_pipeline_package(
'kubeflow-demo2.yaml',
)
➜ ~ python3 run.py
In the next article I will explain the artifacts stored in the mlflow server.