Photo by Sigmund on Unsplash

Unleashing the Power of Kubeflow: A Guide to Training Machine Learning Pipelines (Part 2: Hands on)

Ines Benameur
Gnomon Digital
Published in
17 min readFeb 1, 2024

--

Table of contents

Introduction

In the previous article, we provided an introduction to the Kubeflow ecosystem, its global architecture and a detailed description of Kubeflow Pipelines.

In the subsequent sections, we’ll guide you through the process of defining and creating machine learning components and constructing a pipeline by leveraging the capabilities of Kubeflow Pipelines to optimize your models. Whether you’re a Kubeflow novice or an experienced user looking to enhance your skills, this article will equip you with the knowledge to unlock the full potential of Kubeflow effectively for your machine learning endeavors. Buckle up!

Building an Example Kubeflow Pipeline: From Components to Workflow

Now that we’ve explored the core concepts of defining a Kubeflow Pipeline, let’s take a hands-on approach and walk through the process of crafting a complete pipeline.

For the purpose of this tutorial, we’ll create a simple end-to-end machine learning workflow composed of data preprocessing, model training, and evaluation, using the good old MNIST classification problem.

We will first create a Jupyter notebook built with the pre-existing Tensorflow image and we will both define and run the execution of our pipeline.

We will pull the data from Keras and create a simple neural network using Keras as well.

Also note that in this example, we will be using KFP v2, (2.4.0) which introduces changes compared to v1.

Let’s dig in!

Creating a notebook

  1. Open your Kubeflow UI and go to Notebooks.
  2. Click on + New Notebook.

3. Add a name to the notebook and select the notebook type. We will Use JupyterLab to create a notebook.

4. Open the dropdown button Custom Notebook and select the tensorflow image. This will create a notebook server having tensorflow already installed in it.

5. Click on the last Advanced Options, just above the launch button, and open the Configurations dropdown menu. Select Allow access to Kubeflow Pipelines. As we mentioned in the previous article, we added this configuration manually when we deployed Kubeflow.

6. Click the LAUNCH button.

Defining a component

A component in KFP, acting as a remote function definition, outlines inputs, incorporates user-defined logic, and generates outputs.

When instantiated with input parameters, it transforms into a task within the pipeline.

KFP provides two primary methods for creating components:

  • Python Components: Offering a straightforward way for components implemented in pure Python, including Lightweight and Containerized Python Components.
  • Container Components: Providing a more advanced authoring approach for components not solely implemented in Python. This is the recommended approach for components that are not implemented in pure Python.

Additionally, Importer Components are pre-built functionalities within KFP, designed to facilitate the importation of artifacts into the pipeline, particularly useful for artifacts not produced by tasks within the pipeline.

Check the documentation for more details about components in Kubeflow.

Our first Python component

Let’s take a look at the definition of our first component in the pipeline, called load_data, which as the name suggests, will pull the data from Keras and create the associated artifacts that will be shared with the rest of the components.

But before we do, let’s first import the required kfp modules:

import kfp
import kfp.dsl as dsl
from kfp.dsl import Input, Output
from kfp.dsl import Dataset, Artifact
from kfp.dsl import Model, Metrics, ClassificationMetrics

from typing import NamedTuple
BASE_IMAGE = 'tensorflow/tensorflow:latest'
@dsl.component(
base_image=BASE_IMAGE,
)
def load_data(
x_train_pickle: Output[Dataset],
y_train_pickle: Output[Dataset],
x_test_pickle: Output[Dataset],
y_test_pickle: Output[Dataset],
):
# import dataset
from keras.datasets import mnist
import numpy as np
import pickle

# load dataset
(x_train, y_train), (x_test, y_test) = mnist.load_data()

# count the number of unique train labels
unique, counts = np.unique(y_train, return_counts=True)
print("Train labels: ", dict(zip(unique, counts)))

# count the number of unique test labels
unique, counts = np.unique(y_test, return_counts=True)
print("\nTest labels: ", dict(zip(unique, counts)))

with open(x_train_pickle.path, "wb") as file:
pickle.dump(x_train, file)

with open(y_train_pickle.path, "wb") as file:
pickle.dump(y_train, file)

with open(x_test_pickle.path, "wb") as file:
pickle.dump(x_test, file)

with open(y_test_pickle.path, "wb") as file:
pickle.dump(y_test, file)

Let’s go through this implementation line by line:

1. Decorate the function @dsl.component

Lightweight Python Components are constructed by decorating Python functions with the [@dsl.component] decorator. The @dsl.component decorator transforms your function into a KFP component, to be executed inside a container at pipeline runtime. There are 2 main arguments that we can pass to this decorator. We used the first one in our first component:

base_image: When building the container for the component, Kubeflow uses python3.7 as a default base image. However, you can override it by providing the appropriate docker image that contains the right python image and/or the dependencies required to run your component. In this example, we need the Keras package to download the training data, so we used the 'tensorflow/tensorflow:latest' as base image. There are 2 main points that you should keep in mind about base images:

  • The components within the pipeline need not share the same base image, as each component will be built in its own separate container. A best practice is to use an image for each component that contains the minimum requirements for the code to function properly.
  • You can also use your own customized docker image, but you have to push it to a registry that your Kubernetes cluster running Kubeflow has access to.

2. Define the load_data function

Now, let’s examine the function that is set to run inside the container. The function load_data takes 4 arguments of type kfp.dsl.Output.

Before delving into this particular argument type, it’s essential to grasp how Inputs and Outputs are managed within Kubeflow components. Kubeflow components and pipelines accept two types: Parameters and Artifacts.

Parameters serve to convey small amounts of data and can be of types such as int, str, float, bool, list, Dict, and may have default values. So far, this is nothing more extraordinary than a standard Python function. However, the Artifact type introduces a distinctive aspect, making it a bit different from the norm.

3. Save into Artifacts

In Kubeflow, artifacts play a pivotal role in capturing, storing, and sharing the outputs generated during the execution of machine learning workflows. An artifact can encompass a diverse range of entities such as datasets, models, metrics, visualizations, or any other relevant output produced by pipeline components. Artifacts serve as a mean to maintain a traceable lineage of data and models throughout the machine learning lifecycle, facilitating reproducibility and collaboration.

Kubeflow’s artifact tracking allows users to easily log and version artifacts, enabling a comprehensive understanding of the experimental context and aiding in the comparison of different model versions. Whether it’s tracking the evolution of training datasets, storing model checkpoints, or recording performance metrics, artifacts in Kubeflow contribute to the establishment of a robust and transparent machine learning pipeline.

To declare an artifact:

  1. The first step is to specify whether it’s an input or an output artifact, indicating whether it will be consumed by the component or generated by it. Notably, even if the artifact is an object returned by the component, it is declared as an argument of the function.
  2. Next, you must specify the type of the artifact. The most generic and commonly used artifact types include Artifact, Dataset, Model, and Metrics. Artifact serves as the default base type and should be utilized when the artifact type doesn't neatly fit into another category. For more details and specific use cases of artifact types, refer to the Kubeflow documentation on artifacts.

Artifact Store

Now when it comes to storing these Artifacts, Kubeflow provides an Artifact Store, a central repository for storing and managing these input and output artifacts. The Artifact Store helps maintain a consistent record of data lineage, versioning, and metadata associated with each artifact. The storage mechanism is designed to accommodate diverse types of artifacts and supports various storage backends, including cloud-based solutions like Google Cloud Storage , Amazon S3, etc. This flexibility allows users to choose the storage solution that best fits their workflow requirements.

Handling Artifacts

Back to our component, our function will generate 4 artifacts of type Datasets, which as their name suggests, contain pickle files, the first 2 to store the training data, and the remaining 2 to store the test data.

After loading the MNIST data from Keras, we need to store the files in the corresponding path of the output artifact. For that the .path property comes in handy. When you write the contents of your artifact to the location provided by the artifact’s .path attribute, the pipelines backend will handle copying the file at .path to the URI at .uri automatically, allowing you to create artifact files within a component by only interacting with the task’s local filesystem.

Data Pre-processing Component

Now let’s look at the next component:

@dsl.component(
base_image=BASE_IMAGE
)
def preprocess_data(
x_train_pickle: Input[Dataset],
y_train_pickle: Input[Dataset],
x_test_pickle: Input[Dataset],
y_test_pickle: Input[Dataset],
x_train_prep: Output[Dataset],
y_train_prep: Output[Dataset],
x_test_prep: Output[Dataset],
y_test_prep: Output[Dataset],
) -> NamedTuple("outputs", input_size=int, num_labels=int):

from keras.utils import to_categorical
import numpy as np
import pickle
from typing import NamedTuple

with open(x_train_pickle.path, "rb") as file:
x_train = pickle.load(file)

with open(y_train_pickle.path, "rb") as file:
y_train = pickle.load(file)

with open(x_test_pickle.path, "rb") as file:
x_test = pickle.load(file)

with open(y_test_pickle.path, "rb") as file:
y_test = pickle.load(file)

num_labels = len(np.unique(y_train))

y_train = to_categorical(y_train)
y_test = to_categorical(y_test)
image_size = x_train.shape[1]
input_size = image_size * image_size
# resize and normalize
x_train = np.reshape(x_train, [-1, input_size])
x_train = x_train.astype("float32") / 255
x_test = np.reshape(x_test, [-1, input_size])
x_test = x_test.astype("float32") / 255
with open(x_train_prep.path, "wb") as file:
pickle.dump(x_train, file)

with open(y_train_prep.path, "wb") as file:
pickle.dump(y_train, file)

with open(x_test_prep.path, "wb") as file:
pickle.dump(x_test, file)

with open(y_test_prep.path, "wb") as file:
pickle.dump(y_test, file)
outputs = NamedTuple("outputs", input_size=int, num_labels=int)
return outputs(input_size, num_labels)

In the next component, we will do some preprocessing to the data we exported in the previous component. We will apply some simple resizing and data normalization to the input arrays for both the train and test data. Since input artifacts must be handled as immutable entities, attempting to modify the contents of the file at .path is discouraged. We declare our 4 previous files as inputs, and create new Output artifacts that will store the preprocessed data.

Another thing to note here is the NamedTuple, as the returned object of this function. When your component is returning multiple parameters as output, they need to be declared as a NamedTuple.

Training and creating the model artifact

Now, let’s get more serious with the train component:

@dsl.component(
base_image=BASE_IMAGE
)
def train(
input_size: int,
num_labels: int,
epochs: int,
x_train_pickle: Input[Dataset],
y_train_pickle: Input[Dataset],
model_artifact: Output[Model],
log: Output[Artifact],
):
from keras.callbacks import TensorBoard
from keras.models import Sequential
from keras.layers import Dense, Activation, Dropout
import pickle
from datetime import datetime

batch_size = 128
hidden_units = 256
dropout = 0.45

with open(x_train_pickle.path, "rb") as file:
x_train = pickle.load(file)

with open(y_train_pickle.path, "rb") as file:
y_train = pickle.load(file)


log_dir = f"{log.path}/logs/fit/{datetime.now().strftime('%Y%m%d-%H%M%S')}"
tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)

model = Sequential()
model.add(Dense(hidden_units, input_dim=input_size))
model.add(Activation("relu"))
model.add(Dropout(dropout))
model.add(Dense(hidden_units))
model.add(Activation("relu"))
model.add(Dropout(dropout))
model.add(Dense(num_labels))
model.add(Activation("softmax"))

model.summary()

model.compile(
loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
)

model.fit(
x_train,
y_train,
epochs=epochs,
batch_size=batch_size,
callbacks=[tensorboard_callback],
)

model.save(model_artifact.path)

To train our model, we create a neural network, using the adam optimizer, and the categorical_crossentropy as the loss function for the training.

The component’s function will take as input the number of pixels per image as the input size, the number of labels, which is 9 ( 9 digits), and the number of epochs to run the training. It will also take the training Dataset, and outputs 2 artifact types. The first one is of type Model and it will store the model artifact, which is the output of the command model.save(model_artifact.path)

The next output will be used to store the model logs, which we will use later in Tensorboard to monitor the model. This artifact is not really a Dataset, nor a model artifact or a metrics object, so we just declare it as Artifact.

Evaluating and creating metrics artifacts

After the training, we will create an evaluation component, to measure the model performance on the test dataset:

@dsl.component(
base_image=BASE_IMAGE,
packages_to_install=["scikit-learn"],
)
def evaluate(
model_artifact: Input[Model],
metrics: Output[ClassificationMetrics],
scalar_metrics: Output[Metrics],
x_test_pickle: Input[Dataset],
y_test_pickle: Input[Dataset],
):
from keras.models import load_model
from keras.metrics import Precision
from sklearn.metrics import confusion_matrix
import numpy as np
import pickle

model = load_model(model_artifact.path)

batch_size = 128

with open(x_test_pickle.path, "rb") as file:
x_test = pickle.load(file)

with open(y_test_pickle.path, "rb") as file:
y_test = pickle.load(file)

predictions = model.predict(x_test, batch_size=batch_size)
predictions = (predictions >= 0.5).astype(int)

metrics.log_confusion_matrix(
["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"],
confusion_matrix(
y_test.argmax(axis=1), predictions.argmax(axis=1)
).tolist(), # .tolist() to convert np array to list.
)
m = Precision()
m.update_state(y_test, predictions)
loss, acc = model.evaluate(x_test, y_test, batch_size=batch_size)
scalar_metrics.log_metric("accuracy", acc)
scalar_metrics.log_metric("loss", loss)
scalar_metrics.log_metric("precision", m.result().numpy().tolist())

For this component, we added a new argument to the @dsl.component decorator: packages_to_install. This argument is used to pass, as a list, all the packages that are not available in the base image, and will be used inside the component. For our case, we need scikit-learn

to compute the confusion matrix in order to evaluate our model.

We also defined 2 new output artifacts: Metrics and ClassificationMetrics.

Metrics is used for any kind of scalar metric we want to log. All you have to do is call the function log_metric for every scalar and pass in the name of the metric and its value.

ClassificationMetrics , as the name suggests, is used to store classification specific metrics, which are the confusion matrix and the ROC curve. This will add a visualization in kubeflow UI when you run the pipeline.

To output the confusion matrix, use the log_confusion_matrix function with the list of the labels as the first argument, and the function that will compute the matrix as a second argument.

Defining the pipeline

Now that all our components are in place, let’s assemble all the pieces to create the pipeline:

@dsl.pipeline(
name="mnist_pipeline",
)
def mnist_pipeline(epochs: int):
data = (
load_data()
.set_memory_limit("4G")
.set_memory_request("4G")
.set_cpu_limit("2")
.set_cpu_request("2")
)
preprocess = (
preprocess_data(
x_train_pickle=data.outputs["x_train_pickle"],
y_train_pickle=data.outputs["y_train_pickle"],
x_test_pickle=data.outputs["x_test_pickle"],
y_test_pickle=data.outputs["y_test_pickle"],
)
.set_memory_limit("4G")
.set_memory_request("4G")
.set_cpu_limit("1")
.set_cpu_request("1")
)
preprocess.after(data)
model = (
train(
input_size=preprocess.outputs["input_size"],
num_labels=preprocess.outputs["num_labels"],
epochs=epochs,
x_train_pickle=preprocess.outputs["x_train_prep"],
y_train_pickle=preprocess.outputs["y_train_prep"],
)
.set_memory_limit("6G")
.set_memory_request("6G")
.set_cpu_limit("1")
.set_cpu_request("1")
)
model.after(preprocess)
evaluation = (
evaluate(
model_artifact=model.outputs["model_artifact"],
x_test_pickle=preprocess.outputs["x_test_prep"],
y_test_pickle=preprocess.outputs["y_test_prep"],
)
.set_memory_limit("4G")
.set_memory_request("4G")
.set_cpu_limit("1")
.set_cpu_request("1")
)
evaluation.after(model)

To define a pipeline, decorate your python function with the decorator: @dsl.pipeline. For this example, we will only pass the name of the pipeline as an argument.

The pipeline function here takes as input an integer, representing the number of epochs. We will see later how we will pass this parameter, when we trigger an execution of the pipeline.

Then we will start calling our declared components, one by one and passing all the required Input parameters and artifacts.

Let’s talk about the set_memory_limit and set_memory_request functions (idem for CPU).

Remember how we said that each component will be run as a separate container in a Kubernetes Pod, well these 2 functions are more related to Kubernetes and how Kubeflow is deployed. In fact, when you deploy a Kubeflow cluster, you would usually specify some resource quotas per namespace, in order to control and restrict resource (memory and CPU) usage per namespace or object. Once you set the quotas, you need to specify the request and the limit for each resource during the Pod creation, in order to respect the quotas you set. But it is also advised to set request and limits even if you don’t set any quotas in your namespace, in order to avoid a certain Pod hijacking all the node’s resources.

You don’t have to set these values for every task you run in a pipeline, since there will be default values defined in the Pods manifest file, during the Kubeflow deployment. However, you can override these values, usually for the training component, if this latter requires more resources than what has been defined in the default values.

Another thing to consider here, is the way we pass artifacts between components inside a pipeline.

Whether its a simple parameter or an artifact, the output of a component is accessed with my_component.outputs['my_output_name'] .

Finally, remember how we said earlier in the article that ,the components in a pipeline are connected in a Directed Acyclic Graph (DAG), it’s actually the way we pass information between them, that sets the direction of the DAG. In other words, if we declare in the pipeline that component B will take as input the output of component A, Kubeflow will automatically run task A before B when we trigger an execution.

If 2 tasks do not depend on each other’s output, they will be executed in parallel. Unless we manually specifically an order of execution using .after just like we mentioned above: preprocess.after(data).

Note that this statement is not necessary since the component preprocess already depends on the output of data and it will automatically run after it. We have included it here for demonstration purposes only.

Executing the pipeline

And now the moment of truth ! Let’s get this pipeline going:

client = kfp.Client()
client.create_run_from_pipeline_func(
mnist_pipeline,
arguments={"epochs": 30},
experiment_name="mnist_pipeline",
)

First, you need to create a Client object, in order to communicate with Kubeflow pipelines API.

Using the method create_run_from_pipeline_func, we will trigger a run of the pipeline we defined above.

We also need to pass a dictionary containing all the arguments that the pipeline function expects. In this case we only defined epochs as an argument.

Next we will add a name for an experiment, where the run will be triggered. Kubeflow will use an existing experiment with that name, or create a new one if it doesn’t exist.

Experiments are a concept used to organize, track, and manage pipeline runs and their associated metadata. It is a named collection of trials that share a common objective, dataset, or configuration. For example, if we would like to experiment with the same ML model and pipeline components, by testing different datasets, or different hyperparameters, we would trigger all the associated runs under the same experiment. This would enable us to easily track, and compare evaluations metrics for the subsequent runs.

We will see a concrete example of that in Kubeflow UI.

When executing the cell above, the command outputs the following:

Here we see 2 links to Kubeflow UI, one showing details about the experiment in question, and the other leading to the run we just created. You can also see the generated id for that run in the last line.

Kubeflow UI

Runs

Click on Run details, and let’s go through it:

Under the Graph tab, we can see the DAG associated with the pipeline we created. We can clearly see the dependencies between all the components, along with artifacts produced by each of them.

The nodes with the green icon represent the run steps, i.e, the components, and the nodes with the folder icon are the artifacts.

Also, notice how there are 2 different green icons: a cloud and a check mark. That’s because the run we are showing here, is not the first run we triggered for this pipeline. Steps marked with cloud icons represent tasks that were not executed in the current run but were cached from a prior execution. The check mark icons signify steps that have been successfully executed in the current run.

As we mentioned earlier, KFP implements a caching mechanism that operates at the component level and is designed to store and retrieve the results of component executions. Caching allows the reuse of previously computed results, saving time and resources by avoiding redundant computations. When a pipeline is executed, each component checks the cache for previously stored results before starting execution. If a match is found based on the cache key (generated based on the component’s code, parameters, and input artifacts), the component uses the cached outputs, skipping the execution step.

Steps

Click on the train step to closely look at its details:

We can see here a list of all the parameters and their values that were passed as inputs to this component. Input and Output artifacts are provided as well along with their storage paths. For our case, we previously configured Kubeflow to store artifacts in Google Cloud Storage (GCS).

If you go to Task Details, you can see some metadata related to the task id, name, status etc.

The Logs tab provides a detailed and real-time view of the logs generated during the execution by the underlying container running the component.

Artifacts Visualization

Now that our pipeline was successfully executed, we would be interested to see how the model performed on the test data.

Click on the scalar metrics artifact:

Here we see a table of all the scalar metrics we logged in the evaluate component. The Artifact Info tab shows some metadata about the metrics like the name, type, creation time, etc.

We also computed a confusion matrix in the evaluation step, let’s check it out. Click on the metrics artifact:

We can see that KFP generated a nice visualization for the matrix. Looks like our model was able to accurately predict the matching digit for most of the images in the test data. Not bad !

Comparing Runs

One last thing we will take a look at, is how to benefit from the usage of experiments in Kubeflow. For this run, we put 30 as the number of epochs in the pipeline parameter. Let’s create a new run with a different value, 10 for example, and see how it will impact the model’s performance.

Click on Clone run, and choose the mnist_pipeline experiment that we created earlier. Put 10 as the number of epochs, and click on Start:

This will start a new run with the provided new parameter. After the run is successfully executed, go to Experiments (KFP) in the left side panel, and select the mnist_pipeline, to see all the runs that were triggered under this experiment.

Check the 2 runs that we want to compare and click on Compare runs:

You can see here a first table describing the list of parameters that were given to each run, and a comparison table of all the metrics generated by the pipeline.

We can conclude from the results, that the model performed slightly better in the first run, which makes sense, since it was trained with a larger number of training steps.

Now go to the Confusion Matrix tab, and select the generated matrix of each run:

Check the values thoroughly to confirm the conclusion we made while comparing the scalar metrics.

Take it from here

Throughout this tutorial, we looked closely into the fundamental concepts of implementing modular components, defining and executing pipelines, tracking artifacts, and ensuring reproducibility. But this is just the beginning as we didn’t cover other use cases including:

  • Usage of container components that encapsulate more complex tasks.
  • Compilation and usage of component/pipeline YAML files.
  • Usage of data volumes to persistently store datasets and pass them between pipeline steps, as an alternative to Dataset artifacts.

Conclusion

We have now reached the end of our suite of articles on Kubeflow Pipelines, Kudos to you for enduring this tutorial till the end!

As you may have thought already, writing an end-to-end workflow with Kubeflow seems to be complicated and a bit challenging to grasp in a first experience, but it gets more seamless with time and practice.

It’s also essential to mention that, as you become more familiar with the Kubernetes ecosystem, the process becomes more intuitive.

Feel free to continue exploring Kubeflow’s other features, such as hyperparameter tuning with Katib, and model serving with Kserve, that make Kubeflow a powerful ally in the domain of machine learning orchestration.

References

The code presented in this hands-on tutorial is available in this GitHub repository for reference: https://github.com/gnomondigital/gd-kubeflow-tutorial

--

--