Kubeflow Pipelines — Some Useful Tips

Revathi Prakash
4 min readFeb 12, 2022

--

Kubeflow Pipelines

Kubeflow Pipelines is a platform for building and deploying portable, scalable machine learning (ML) workflows based on Docker containers. The main goals of the Kubeflow pipeline are end-to-end orchestration, ease of experimentation, and reusability. A pipeline is a description of an ML workflow, including all of the components in the workflow and how they combine in the form of a graph. You can read more here.

An example pipeline will look like as shown below.

Fig 1: Sample Kubeflow Pipeline

While creating the various tasks that constitute the pipeline we can reuse different components that are already available to implement the tasks. Below are some of the common ways in which we can reuse the existing components.

Reuse Existing Kubeflow Components

1. Custom Function Execution

One of the most common use cases is to run custom code as part of the pipeline. If you have written your code in python the easiest way to get this to work is using func_to_container_op.

First, we need to have a base image where we are going to execute the custom code. Eg: we can pick up the

gcr.io/deeplearning-platform-release/tf-cpu.2–6

as our starting docker image so it has got all the libraries that we need.

Sample code

from kfp.components import func_to_container_opbase_image="gcr.io/deeplearning-platform-release/tf-cpu.2–6"def preprocess(
df):
import pandas as pd
from sklearn.preprocessing import StandardScaler
scaler=StandardScaler()
numeric_cols=list(df.dtypes[df.dtypes != 'object'].index)
df.loc[:,numeric_cols]=scaler.fit_transform(df.loc[:,numeric_cols])
return df
preprocess_operator = func_to_container_op(preprocess, base_image=base_image)@dsl.pipeline(name='test pipeline',description=f'Sample pipeline test')
def pipeline(df=df):
preprocess_task = preprocess_operator(
df=df
)

you can then run the pipeline using the custom code you have created. This operator can be used to execute any custom python code that we can mount on a docker image and add it as part of the pipeline.

2. Loop/Parallel Execution

If we want to execute the same set of code over a set of values we can use the below code to do it in parallel.

Sample Code

In this case op1 would be executed twice, once with case args=['echo 1'] and once with case args=['echo 2']:

with dsl.ParallelFor([{'a': 1, 'b': 10}, {'a': 2, 'b': 20}]) as item:
op1 = ContainerOp(..., args=['echo {}'.format(item.a)])
op2 = ContainerOp(..., args=['echo {}'.format(item.b])

3. Reuse publically available components

We can make use of already available components that are publically available to create pipeline components. The below example creates a dataflow job to be added as a task to the pipeline

dataflow_python_op = comp.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/1.7.0-rc.3/components/gcp/dataflow/launch_python/component.yaml')args = json.dumps(
[
'--output', output_dir,
'--service_account_email', service_account,
'--subnetwork', subnetwork
]
)
dataflow_python_op(
python_file_path = python_file_path,
project_id = project_id,
staging_dir = staging_dir,
requirements_file_path = requirements_file_path,
args = args,
region=region,
wait_interval = wait_interval)

The above should be enough to create different tasks that constitute a ML workflow pipeline. In addition to adding these parameters we can use kubeflow to track the metrics required for each pipeline run so we can compare the results before different iterations of experimentation.

4. Add Metrics for ML models

Define Output[Metrics] argument in your component function, then output Scalar data using API log_metric(self, metric: str, value: float). You can define any amount of metric by calling this API multiple times. metric defines the name of metric, value is the value of this metric.

Sample code:

@component(
packages_to_install=['sklearn'],
base_image='python:3.9',
)
def digit_classification(metrics: Output[Metrics]):
from sklearn import model_selection
from sklearn.linear_model import LogisticRegression
from sklearn import datasets
from sklearn.metrics import accuracy_score

# Load digits dataset
iris = datasets.load_iris()

# # Create feature matrix
X = iris.data

# Create target vector
y = iris.target

#test size
test_size = 0.33

seed = 7
#cross-validation settings
kfold = model_selection.KFold(n_splits=10, random_state=seed, shuffle=True)

#Model instance
model = LogisticRegression()
scoring = 'accuracy'
results = model_selection.cross_val_score(model, X, y, cv=kfold, scoring=scoring)

#split data
X_train, X_test, y_train, y_test = model_selection.train_test_split(X, y, test_size=test_size, random_state=seed)
#fit model
model.fit(X_train, y_train)

#accuracy on test set
result = model.score(X_test, y_test)
metrics.log_metric('accuracy', (result*100.0))

@dsl.pipeline(
name='metrics-visualization-pipeline')
def metrics_visualization_pipeline():
digit_classification_op = digit_classification()

The above are main components that are useful to create a kubeflow pipeline to define an ML workload. You can read more about kubeflow with the below links.

Please let me know in comments if you have any questions or suggestions. Happy Reading.

--

--

Revathi Prakash

Data Scientist @THEICONIC. Data Enthusiast, Learner, Coder