End-to-End MLOps Pipelines for Sentiment Analysis on Azure using Terraform, Kubeflow v2, MlFlow and Seldon: Part5

Rachit Ahuja
8 min readAug 25, 2023

--

Part 1: Introduction and Architecture planning

Part 2: Developing workflows for infrastructure deployment via CI-CD

Part 3: Setup MLflow on AKS

Part 4: Setup Kubeflow and Seldon on AKS

Part 5: End-to-End training pipeline and Inference Deployment using Seldon

In this final installment (Part 5), we embark on the execution of an End-to-End Training Pipeline for a Sentiment Analysis Model. This encompassing pipeline encompasses various stages, commencing from establishing an MLFlow experiment for meticulous training run tracking, data retrieval, preprocessing, training diverse models, and ultimately registering the best model based on accuracy metrics. To facilitate a straightforward implementation, we’ll employ port-forwarding to route our Kubeflow pipeline to a local port and trigger the training pipeline via API calls. In real-world scenarios, these training runs are typically scheduled as recurring Jobs. The entire repository containing the pipeline and source code can be accessed here: here. This repository houses three subdirectories: one containing source code for each step, another containing the pipeline code leveraging base images to initiate the training pipeline and the last sub-directory containing the Inference code along with the dependent files.

The following KubeFlow script is responsible for triggering the training pipeline. It can be integrated into CI/CD workflows or manually invoked:

import kfp
import kfp.dsl as dsl
from kfp.v2.dsl import (
component,
Input,
Output,
Dataset,
Artifact,
InputPath,
OutputPath,
)
from kfp import compiler
import subprocess
from kfp.aws import use_aws_secret
from typing import NamedTuple

@component(base_image="racahu23/blog_mlflow:1", packages_to_install=['mlflow'])
def mlflow_setup_experiment(tracking_uri:str)->NamedTuple('Outputs',[("exp_id", str), ("run_id", str)]):
import main
op = main.main(tracking_uri)
return op
@component(base_image="racahu23/preprocess:blog", packages_to_install=['mlflow', 'boto3'])
def twitter_download_preprocess(information: Output[Artifact], experiment_id: str, run_id: str, tracking_uri: str):
from main import twitter_sample_download_and_preprocess
args={
"experiment_id": experiment_id,
"run_id": run_id,
"tracking_uri": tracking_uri,
"log_folder": information.path
}
twitter_sample_download_and_preprocess(args)

@component(base_image="racahu23/numpy:blog_final", packages_to_install=['mlflow', 'boto3'])
def numpy_process(information: Input[Artifact], information_output: Output[Artifact], experiment_id: str, run_id: str,
tracking_uri: str):
from main import numpy_process
args = {
"experiment_id": experiment_id,
"run_id": run_id,
"tracking_uri": tracking_uri,
"log_folder": information.path,
"output_dir": information_output.path
}
op = numpy_process(args)
return op
@component(base_image="racahu23/scikit:3", packages_to_install=['boto3'])
def sklearn_logistic(information_input: Input[Artifact], experiment_id: str,
tracking_uri: str, run_id: str, sklearn_output: Output[Artifact]):
from main import sklearn_logistic
args = {
"experiment_id": experiment_id,
"tracking_uri": tracking_uri,
"log_folder": information_input.path,
"output_dir": sklearn_output.path,
"run_id": run_id
}
op = sklearn_logistic(args)
@component(base_image="racahu23/logistic:2", packages_to_install=['mlflow', 'boto3'])
def logistic_op(sklearn_input:Input[Artifact], logistic_output: Output[Artifact], experiment_id: str, run_id: str,
tracking_uri: str):
from main import logistic
args = {
"experiment_id": experiment_id,
"run_id": run_id,
"tracking_uri": tracking_uri,
"log_folder": sklearn_input.path,
"output_dir": logistic_output.path
}
op = logistic(args)
@component(base_image="racahu23/torch:1", packages_to_install=['mlflow', 'boto3'])
def torch_op(logistic_input:Input[Artifact], torch_output: Output[Artifact],
experiment_id: str, run_id: str, tracking_uri: str
):
from main import torch_process_logistic
args = {
"experiment_id": experiment_id,
"run_id": run_id,
"tracking_uri": tracking_uri,
"log_folder": logistic_input.path,
"output_dir": torch_output.path
}
op = torch_process_logistic(args)

@component(base_image="racahu23/svm:1", packages_to_install=['mlflow', 'boto3'])
def svm_op(svm_input: Input[Artifact], svm_output: Output[Artifact],
experiment_id: str, run_id: str, tracking_uri: str
):
from main import svm_process
args = {
"experiment_id": experiment_id,
"run_id": run_id,
"tracking_uri": tracking_uri,
"log_folder": svm_input.path,
"output_dir": svm_output.path
}
op = svm_process(args)

@component(base_image="racahu23/register:4", packages_to_install=['mlflow', 'python-dotenv'])
def register_op( run_id: str, tracking_uri: str):
from main import register_model
args = {
"run_id": run_id,
"tracking_uri": tracking_uri,
}
op = register_model(args)

if __name__ =="__main__":
@dsl.pipeline(
name='Twitter nltk pipeline',
description='Writing code by the other way.'
)
def pipeline(mlflow_uri: str):
pvc_name = "twitter-5000"
"""
vop = dsl.VolumeOp(
name=pvc_name,
resource_name="twitter-5000",
size="1Gi",
modes=dsl.VOLUME_MODE_RWM
)
"""
op_mlflow = mlflow_setup_experiment(tracking_uri=mlflow_uri)
download_task= twitter_download_preprocess(experiment_id=op_mlflow.outputs["exp_id"], run_id=op_mlflow.outputs["run_id"], tracking_uri=mlflow_uri)
numpy_task = numpy_process(information=download_task.outputs['information'], experiment_id=op_mlflow.outputs["exp_id"], run_id=op_mlflow.outputs["run_id"]
, tracking_uri=mlflow_uri).after(
download_task)


sklearn_task= sklearn_logistic(information_input=numpy_task.outputs["information_output"], experiment_id=op_mlflow.outputs["exp_id"], tracking_uri=mlflow_uri, run_id=op_mlflow.outputs["run_id"]).after(numpy_task)


logistic_task= logistic_op(sklearn_input=numpy_task.outputs["information_output"], experiment_id=op_mlflow.outputs["exp_id"], run_id=op_mlflow.outputs["run_id"]
, tracking_uri=mlflow_uri).after(sklearn_task)


torch_task = torch_op(logistic_input=numpy_task.outputs["information_output"], experiment_id=op_mlflow.outputs["exp_id"], run_id=op_mlflow.outputs["run_id"]
, tracking_uri=mlflow_uri ).after(logistic_task)

svm_task = svm_op(svm_input=numpy_task.outputs["information_output"], experiment_id=op_mlflow.outputs["exp_id"], run_id=op_mlflow.outputs["run_id"]
, tracking_uri=mlflow_uri).after(torch_task)
register_task = register_op(tracking_uri=mlflow_uri, run_id=op_mlflow.outputs["run_id"]).after(svm_task)
if __name__ == "__main__":
client = kfp.Client(namespace="rahuja23", host="http://localhost:8080")
client.create_run_from_pipeline_func(pipeline,
arguments={"mlflow_uri": "http://mlflow.use-case.svc.cluster.local:5000"},
mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE,
enable_caching=False)

The script efficiently triggers the training pipeline, orchestrating each stage along the way.

End-to-End Kubeflow Pipeline

Following this comprehensive pipeline execution, the logged artifacts contribute to registering the optimal model within MLFlow, grounded in individual experiment outcomes.

Mlflow Server with logged Experiment Data

The focal point now shifts towards the deployment of our prime registered ML model utilising Seldon.

Model Deployment using Seldon

In this segment, we delve into the deployment of the premier model as a Kubernetes deployment with the aid of Seldon. Seldon offers an open-source platform, “Seldon Core,” designed to assist data scientists and engineers in deploying and overseeing machine learning models on Kubernetes clusters.

The Inference folder’s structure is outlined below:

Inference
|-- Dockerfile
|-- Inference.py
|-- depl.yaml
|-- requirements.txt
`-- sklearn_folder
|-- Sentiment-LR.pickle
`-- vectoriser-ngram-(1,2).pickle

To carry out the model deployment, the ensuing steps are imperative:

  • Create a Custom Wrapper: Formulate the custom wrapper code incorporating the intended functionality. This may encompass preprocessing logic, post-processing logic, or any other tailored behavior. The wrapper can be scripted using your preferred programming language.
%%Inference.py

import re
import string
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
from nltk.tokenize import TweetTokenizer
from nltk.stem import WordNetLemmatizer
import pickle
import os
import logging
log = logging.getLogger()
class Inference(object):
def __init__(self):
self.model_loaded = False
self.tokenizer = TweetTokenizer(preserve_case=False, strip_handles=True, reduce_len=True)
self.stopwords_en = stopwords.words('english')
self.punctuation_en = string.punctuation
self.stemmer = PorterStemmer()
self.sklearn_folder= "sklearn_folder"
self.emojis = {':)': 'smile', ':-)': 'smile', ';d': 'wink', ':-E': 'vampire', ':(': 'sad',
':-(': 'sad', ':-<': 'sad', ':P': 'raspberry', ':O': 'surprised',
':-@': 'shocked', ':@': 'shocked', ':-$': 'confused', ':\\': 'annoyed',
':#': 'mute', ':X': 'mute', ':^)': 'smile', ':-&': 'confused', '$_$': 'greedy',
'@@': 'eyeroll', ':-!': 'confused', ':-D': 'smile', ':-0': 'yell', 'O.o': 'confused',
'<(-_-)>': 'robot', 'd[-_-]b': 'dj', ":'-)": 'sadsmile', ';)': 'wink',
';-)': 'wink', 'O:-)': 'angel', 'O*-)': 'angel', '(:-D': 'gossip', '=^.^=': 'cat'}

## Defining set containing all stopwords in english.
self.stopwordlist = ['a', 'about', 'above', 'after', 'again', 'ain', 'all', 'am', 'an',
'and', 'any', 'are', 'as', 'at', 'be', 'because', 'been', 'before',
'being', 'below', 'between', 'both', 'by', 'can', 'd', 'did', 'do',
'does', 'doing', 'down', 'during', 'each', 'few', 'for', 'from',
'further', 'had', 'has', 'have', 'having', 'he', 'her', 'here',
'hers', 'herself', 'him', 'himself', 'his', 'how', 'i', 'if', 'in',
'into', 'is', 'it', 'its', 'itself', 'just', 'll', 'm', 'ma',
'me', 'more', 'most', 'my', 'myself', 'now', 'o', 'of', 'on', 'once',
'only', 'or', 'other', 'our', 'ours', 'ourselves', 'out', 'own', 're',
's', 'same', 'she', "shes", 'should', "shouldve", 'so', 'some', 'such',
't', 'than', 'that', "thatll", 'the', 'their', 'theirs', 'them',
'themselves', 'then', 'there', 'these', 'they', 'this', 'those',
'through', 'to', 'too', 'under', 'until', 'up', 've', 'very', 'was',
'we', 'were', 'what', 'when', 'where', 'which', 'while', 'who', 'whom',
'why', 'will', 'with', 'won', 'y', 'you', "youd", "youll", "youre",
"youve", 'your', 'yours', 'yourself', 'yourselves']

def preprocess(self, textdata):
processedText = []

# Create Lemmatizer and Stemmer.
wordLemm = WordNetLemmatizer()

# Defining regex patterns.
urlPattern = r"((http://)[^ ]*|(https://)[^ ]*|( www\.)[^ ]*)"
userPattern = '@[^\s]+'
alphaPattern = "[^a-zA-Z0-9]"
sequencePattern = r"(.)\1\1+"
seqReplacePattern = r"\1\1"

for tweet in textdata:
tweet = tweet.lower()

# Replace all URls with 'URL'
tweet = re.sub(urlPattern, ' URL', tweet)
# Replace all emojis.
for emoji in self.emojis.keys():
tweet = tweet.replace(emoji, "EMOJI" + self.emojis[emoji])
# Replace @USERNAME to 'USER'.
tweet = re.sub(userPattern, ' USER', tweet)
# Replace all non alphabets.
tweet = re.sub(alphaPattern, " ", tweet)
# Replace 3 or more consecutive letters by 2 letter.
tweet = re.sub(sequencePattern, seqReplacePattern, tweet)
tweetwords = ''
for word in tweet.split():
# Checking if the word is a stopword.
# if word not in stopwordlist:
if len(word) > 1:
# Lemmatizing the word.
word = wordLemm.lemmatize(word)
tweetwords += (word + ' ')

processedText.append(tweetwords)
return processedText

def load_models(self):
'''
Replace '..path/' by the path of the saved models.
'''
# Load the vectoriser.
file = open(os.path.join(self.sklearn_folder,'vectoriser-ngram-(1,2).pickle'), 'rb')
vectoriser = pickle.load(file)
file.close()
# Load the LR Model.
file = open(os.path.join(self.sklearn_folder,'Sentiment-LR.pickle'), 'rb')
LRmodel = pickle.load(file)
file.close()
return vectoriser, LRmodel

def predict(self, X, features_names=None):
if self.model_loaded== False:
print("LOADING MODEL")
self.vectorizer, self.model = self.load_models()
self.model_loaded=True
# Predict the sentiment
log.info("X:", X)
textdata = self.vectorizer.transform(self.preprocess(X))
log.info("Transformed Data")
sentiment = self.model.predict(textdata)
# Make a list of text with sentiment.
log.info("Got Predictions")
data = {}
emotion_dict= {0: "Sad", 1: "Happy"}
for text, pred in zip(X, sentiment):
data[text]= emotion_dict[pred]
log.info("Sending Predictions back!!")
return data

  • Containerize the Wrapper: Bundle the custom wrapper code into a Docker container. The Dockerfile specifies the base image, installs necessary dependencies, and transfers your wrapper code into the container.
    In our case the Dockerfile looked like this:
FROM python:3.8-slim
WORKDIR /app
COPY . .
RUN pip install -r requirements.txt
RUN [ "python3", "-c", "import nltk; nltk.download('stopwords', download_dir='/usr/local/nltk_data')"]
RUN [ "python3", "-c", "import nltk; nltk.download('wordnet', download_dir='/usr/local/nltk_data')"]
ENV MODEL_NAME=Inference
ENV API_TYPE=REST
ENV SERVICE_TYPE=MODEL
ENV PERSISTENCE=0
EXPOSE 5000
EXPOSE 9000
RUN chown -R 8888 /app
CMD exec seldon-core-microservice $MODEL_NAME --service-type $SERVICE_TYPE
  • Build and Push Docker Image: Construct the Docker image using the Dockerfile, followed by pushing it to a container registry. Replace “your-image-tag” with a fitting tag for the image.
    In our case since I am working on Mac M1 which uses an arm-based processor and the kubernetes node are having amd-64 type of processor so I supplied an additional flag “- -platform” while creating the docker image.
docker build -t racahu23/b_inference:final --platform linux/amd64 .
docker push racahu23/b_inference:final
  • Create Seldon Deployment Configuration: Compose a Seldon deployment configuration YAML file, referencing the custom wrapper image. This configuration also designates the model you intend to wrap, accompanied by other essential settings.
    In our case the deployment yaml looked like:
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
name: blogpost-model
namespace: seldon
spec:
predictors:
- componentSpecs:
- spec:
containers:
- name: transformer
image: racahu23/b_inference:final
graph:
name: transformer
type: MODEL
endpoint:
service_port: 5000
name: model
replicas: 1
  • Apply the Configuration: Apply the Seldon deployment configuration to your Kubernetes cluster using kubectl apply.
kubectl apply -f deployment.yaml

This would create a Seldon deployment in our “seldon” namespace.

All resources in seldon namespace
  • Accessing API Docs for Deployed Model: Upon reaching the “ready” state of the Seldon Deployment, the model’s API documentation can be accessed via a specific URL format, providing valuable insights into the model’s capabilities.
    Once the Seldon Deployment is in “ready” state the api documentation related to the deployed model can be found on following url:
    http://<ingress_url>/seldon/<namespace>/<model-name>/api/v1.0/doc/
    In our case the api documentation was available at following url: http://20.73.143.214/seldon/seldon/blogpost-model/api/v1.0/doc/
API docs for Seldon Model
  • Model Testing: To finalize the process, assess the deployed model’s performance by submitting data for prediction using Python’s requests module. The results will validate the model's efficacy.
    The test script in this case would look like:
import numpy as np
import json
import requests


from seldon_core.seldon_client import SeldonClient

headers = {"Content-Type": "application/json"}

text = ["I hate twitter",
"May the Force be with you.",
"Mr. Stark, I don't feel so good",
]

data = json.dumps({"data": { "ndarray": text}})
response= requests.post('http://20.73.143.214:80/seldon/seldon/blogpost-model/api/v1.0/predictions', data= data, headers=headers)
print(response.json()['jsonData'])

Once this script is ran the results should look like the following:

Prediction results from Seldon Model

By implementing these stages, we successfully establish an integrated MLOps pipeline for Sentiment Analysis on Azure, employing Terraform, Kubeflow v2, MLFlow, and Seldon. This implementation underscores the synergy between diverse technologies and methodologies, facilitating robust and efficient machine learning operations. Should you require further clarification or seek collaboration opportunities, please don’t hesitate to reach out. Until then, let’s continue our journey of exploration and knowledge sharing in the realm of emerging technologies.

--

--

Rachit Ahuja

Machine learning and Data Engineer at Data Reply GmbH