Practical MLOps using Kedro, Airflow, MLflow and Microsoft Azure services

Mohamed El Amine BOUDELLA
IRIS by Argon & Co
Published in
10 min readDec 6, 2023

--

MLOps is a set of best practices that can be applied by organizations that want to deploy reliable Machine Learning solutions in production.

It is one of the hottest topics right now because it involves automating the entire machine learning lifecycle, from data preparation to model deployment and monitoring. By automating these processes, MLOps helps to reduce the time and effort required to develop and deploy machine learning models, while also improving the accuracy and reliability of these models.

In addition, MLOps helps to address some of the key challenges associated with machine learning workflows, such as version control, ability to reproduce experiments, and scalability. By providing a standardized framework for developing and deploying machine learning models, MLOps helps to ensure that these models are consistent, reliable, and scalable across different environments.

This article aims at showing a practical implementation of MLOps principles, through an example.

MLOps principles and components

To achieve full MLOps, one must apply a guiding set of principles (or best practices). The number and definition of these MLOps principles varies across sources. In fact, an extensive portion of the literature is dedicated to elucidating the definition of MLOps and its associated principles, roles, and responsibilities.

In structuring the forthcoming sections of this article, we adhere to the nine principles defined in this excellent research paper (D. Kreuzberger et al., 2023).

The figure below shows these principles and how they can be implemented through different technical components that we will present in this article as part of the implemented architecture.

MLOps principles and associated technical components
Implementation of principles within technical components (D. Kreuzberger et al., 2023).

At the core of the implementation of MLOps principles (for this example) lies Kedro, a powerful framework that plays an important role in their application. Offering a structured approach to data pipeline development, Kedro facilitates collaboration, reproducibility, and scalability in machine learning projects.

Our use case: Machine Learning forecasting

Before delving into the technical solution details, we’ll provide a brief overview of the use case. Our aim was to design and implement a machine learning solution for sell in forecasting tailored to a global FMCG company with worldwide presence. Sell in refers to the sales of goods made to intermediary customers (wholesalers, and retailers).

Demand planners were already equipped with a solution leveraging traditional methods such as Triple Exponential Smoothing (aka Holt-Winters method) to generate a statistical forecasting signal reviewed each month within the demand review process. At the beginning of each month, forecasts are created at the Country * Month * SKU level projecting sales over the next 18 months.

To measure the quality of forecasts, performance is assessed using two metrics: Forecast Accuracy based on WAPE (Weighted Absolute Percentage Error) and Forecast Bias.

Our objective was to propose a machine learning solution that generates forecasts (at the same granularity level and for the same 18-month time horizon) that outperforms the current solution on the two metrics.

We won’t go into the details of the model here, but as for every project that deals with forecasting with machine learning, we built a large set of features including:

  • both lag and window features on sell in and sell out data at different aggregation levels (world-wide, region-wide, market-wide),
  • calendar features to catch the effect of specific periods in the year,
  • one-hot encoded product attributes,
  • ABC and XYZ product classification features,
  • Features that characterize the COVID-19 period, leveraging open-source data such as the Oxford COVID-19 Government Response Tracker (OxCGRT)
  • … and more

The following sections will detail which technical components we leveraged to implement the aforementioned MLOps principles.

Proposed architecture on Microsoft Azure

An MLOps architecture with Microsft Azure resources and open-source tools (Kedro, Airflow, Mlflow…)
MLOps platform on Microsoft Azure

Source Code Repository (Principles of Versioning of code and Collaboration)

For this specific deployment, we leveraged the existing Data platform on Microsoft Azure and Azure DevOps especially the Azure Repos feature to manage code versioning and to facilitate collaborative development.

CI/CD Component (CI/CD Automation)

Regarding CI/CD, we used Azure Pipelines to push our code to the Azure file share associated with the Azure Machine Learning Workspace each time there is a new push to or merge with the main branch.

Data storage and Model Registry (Versioning of data and model, and Reproducibility)

An Azure Data Lake Storage Gen2 (ADLS Gen2) was used to store all our input, intermediate and output (final forecasts) data. It serves also as a model registry. In our case, Kedro enables data versioning on the data lake and allows you to parameterize your pipelines, giving you the flexibility to run the same pipeline with different configurations (e.g., dev, test, and prod environments).

Let’s talk a little bit about the needed data and the capabilities offered by Kedro in this case.

Our input data in the Azure Data Platform is extensive and diverse encompassing historical sell in, historical sell out, product attributes and more. Stored in Parquet format on an ADLS Gen2, these data are queried seamlessly using SQL through a Dremio Data Lakehouse.

One of the main advantages of using Kedro, to read data, is that it supports many connections out of the box. However, native support for querying data using Dremio is not available. Fortunately, a nice feature of Kedro allows us to create a custom dataset by extending the AbstractDataset class.

Here’s an example of a custom DremioQueryDataSet declared in the catalog.yaml file.

Dataset_name:
type: ml_project.extras.datasets.dremio_dataset.DremioQueryDataSet
query: "
SELECT
*
FROM
tabLE
"
credentials: dremio_credentials

Another example of the definition of a dataset is given below. It shows how Kedro parameterizes the file path of a Parquet Dataset stored on an ALDS Gen2 container. As you can see, the container name is an environment variable ${ADLS_CONTAINER} that takes different values based on the execution environment (e.g., dev_container, test_container or prod_container).

It also shows that versioning is as simple as adding a “versioned” key to the dataset definition in the catalog.yaml file with “True” value and it can be applied to any type of data: Parquet, SQL, Pickled Models, Plots, etc.). That way, it becomes easier to reproduce an experiment because all the Kedro datasets will be tracked using a unique timestamp.

ldu_data:
type: pandas.ParquetDataSet
filepath: "abfs://${ADLS_CONTAINER}/01_Primary_(source data)/file_name.parquet"
credentials: datalake_azure
versioned: True

Workflow Orchestration Component (Worfklow Orchestration, Reproducibility and Continuous ML training and evaluation)

For flow orchestration, we use Airflow that integrates seamlessly with Kedro using the kedro-airflow plugin. Indeed, with a single command, we generated an Airflow DAG directly from the Kedro DAG we built. In the end, all we need to do is concentrate on building the data pipeline the Kedro way.
On the 10th day of each month, the Airflow DAG is triggered, and the data pipeline is executed to ingest and pre-process input data, to build and select features, to train the model on the latest data (up to last month) and finally to produce forecasts.

With Kedro 17.5.0 (the version we used) in particular, there were two main challenges to using Airflow that we summarize below (with a few tips to overcome these issues):

  • Potential conflict between Kedro’s logger and Airflow’s own logger, resulting in the error “Task exited with return code Negsignal.SIGKILL”. To solve this problem, the disable_existing_loggers key in the logging.yaml need to be set to True as shown below :
disable_existing_loggers: True
  • When versioning is enabled in catalog.yaml, Kedro applies a global session ID for all nodes, which is very useful for finding out which run produced which dataset, and for linking all datasets in the output folder. With Airflow however, each DAG step is executed by creating a new run ID, which makes it difficult to link output datasets together. One workaround was to create a timestamp environment variable (AIRFLOW_VAR_KEDRO_RUN_ID) then link it to the different run IDs in each step. This way, we could know which version of dataset falls into which workflow execution. This global run ID was also logged as a parameter in MLFlow.

The variable can be created using the command below:

#!/bin/bash

source /etc/environment

echo "AIRFLOW_VAR_KEDRO_RUN_ID=$(date +%Y-%m-%d_%H.%M.%S)" | sudo tee -a /etc/environment

We then pass it to the Python code throught environment attribute of the docker-compose.yaml file as shown below:

environment:
- AIRFLOW_VAR_KEDRO_RUN_ID=${AIRFLOW_VAR_KEDRO_RUN_ID}

Model training infrastructure (Continuous ML training & evaluation and Reproducibility)

We decided to use an Azure Machine Learning compute instance for training, evaluation, and inference to make life easier for the maintainers (younger, less experienced team in data science) who wanted a managed VM for data science with a user interface featuring a terminal, the ability to create notebooks, etc.

Additionally, we used ARM templates to automate the creation and configuration of the compute instance and to facilitate reproducibility for other users. Indeed, configuration is done with a creation bash script that pre-installs docker-compose and exports the needed environment variables initially stored in Azure KeyVault such as a Personal Access Token on Azure DevOps or Azure IDs (Subscription ID, Application ID, Tenant ID…). These variables are then called in the Python code without exposing their values in plain text.

Here is the content of the creation_script.sh file:

#!/bin/bash

source /home/azureuser/.bashrc

# Connect to azure using the compute instance Managed Identity
az login --identity

# Write Azure DevOps PAT to bashrc file
echo export AZURE_DEVOPS_PAT=$(az keyvault secret show - name personal-access-token - vault-name <keyvault_name> - query value | sed 's/"//g') | sudo tee -a /home/azureuser/.bashrc
echo export SVP_PASSWORD=$(az keyvault secret show - name SVP-PASSWORD - vault-name <keyvault_name> - query value | sed 's/"//g') | sudo tee -a /home/azureuser/.bashrc
echo export APPLICATION_ID=$(az keyvault secret show - name APPLICATION-ID - vault-name <keyvault_name> - query value | sed 's/"//g') | sudo tee -a /home/azureuser/.bashrc
echo export TENANT_ID=$(az keyvault secret show - name TENANT-ID - vault-name <keyvault_name> - query value | sed 's/"//g') | sudo tee -a /home/azureuser/.bashrc
echo export DOCKER_BUILDKIT=1 | sudo tee -a /home/azureuser/.bashrc
echo export COMPOSE_DOCKER_CLI_BUILD=1 | sudo tee -a /home/azureuser/.bashrc

# Set default editor for crontab
echo export EDITOR=nano | sudo tee -a /home/azureuser/.bashrc

# Installation of docker-compose
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose

We also used Docker to package our OS environment. The environment variables initially exported using the creation script are passed to our Python code using the ARG and ENV instructions of the Dockerfile as follows:

ARG SUBSCRIPTION_ID
ENV SUBSCRIPTION_ID ${SUBSCRIPTION_ID}
ARG AZURE_DEVOPS_PAT
ENV AZURE_DEVOPS_PAT ${AZURE_DEVOPS_PAT}
ARG APPLICATION_ID
ENV APPLICATION_ID ${APPLICATION_ID}
ARG SVP_PASSWORD
ENV SVP_PASSWORD ${SVP_PASSWORD}
ARG TENANT_ID
ENV TENANT_ID ${TENANT_ID}
ARG CI_WORKSPACE
ENV WORKSPACE ${CI_WORKSPACE}
ARG CI_RESOURCE_GROUP
ENV RESOURCE_GROUP ${CI_RESOURCE_GROUP}

Monitoring component (Continuous Monitoring, feedback loop)

The process of tuning and evaluating the model can be manually executed when Kedro hooks raise alerts due to performance KPIs falling below a defined threshold.

Moreover, to be notified when the pipeline execution fails, we implemented a hook to send messages to an Incoming Webhook configured on a shared MS Teams channel.

class MSTeamsWebhookMessage:
"""Namespace for grouping all notification hooks with MS Teams together."""

@hook_impl
def on_pipeline_error(self):
"""Hook implementation for sending MS Teams notifications on pipeline failures
"""
msteams_webhook = os.environ.get("MSTEAMS_WEBHOOK")
# Initialize the connector card with the webhook URL
myTeamsMessage = pymsteams.connectorcard(msteams_webhook)

# Write the rror message
myTeamsMessage.text(f"**{str(datetime.datetime.now())[:16]}** : ML pipeline execution. Status : **ERROR** \nCheck the logs")

# Send the message
myTeamsMessage.send()

ML Metadata Stores (ML metadata tracking and Versioning)

For experiments tracking, we coupled Kedro and Mlflow using Kedro hooks, a handy Kedro feature for executing code, for exemple, before a pipeline run or after a node has run. As previously stated, we used it to log the global AIRFLOW_VAR_KEDRO_RUN_ID run ID. We also leveraged the tool to log the model’s performance metrics, the model’s best hyperparameters and the list of features selected by the feature selection algorithm.

One of the main advantages of using MLflow is the fact that it is natively supported by Azure Machine Learning since you can use Azure ML as a back-end server to submit your experiments and log all parameters and metrics directly in the Azure Machine Learning user interface. In our case, we also log a text file with the selected features’ names for even more reproducibility.

Tracking can be implemented as follows in the hooks.py file. We can see here, that the values of the previously defined environment variables are not exposed in the Python code.

class ModelTrackingHooks:
"""Namespace for grouping all model-tracking hooks with MLflow together."""

@hook_impl
def before_pipeline_run(self, run_params: Dict[str, Any]) -> None:
"""Hook implementation to start an MLflow run
with the same run_id as the Kedro pipeline run.
"""

# Get environment variables
workspace_name = os.environ.get('WORKSPACE_NAME')
subscription_id = os.environ.get('SUBSCRIPTION_ID')
resource_group = os.environ.get('RESOURCE_GROUP')
tenant_id = os.environ.get('TENANT_ID')
svc_pr_password = os.environ.get('SVP_PASSWORD')
application_id = os.environ.get('APPLICATION_ID')
keyVaultName = os.environ.get('KEYVAULT_NAME')
KVUri = f"https://{keyVaultName}.vault.azure.net"

# Get the current git hash
repo = git.Repo(search_parent_directories=True)
git_sha = repo.head.object.hexsha

# Service principal authentication
svc_pr = ServicePrincipalAuthentication(tenant_id=tenant_id,
service_principal_id=application_id,
service_principal_password=svc_pr_password,
_enable_caching=False)
# Get Azure workspace
ws = Workspace.get(name=workspace_name,
subscription_id=subscription_id,
resource_group=resource_group,
auth=svc_pr)
# Set tracking uri
mlflow.set_tracking_uri(ws.get_mlflow_tracking_uri())

# Create an Azure ML experiment in the workspace
experiment = Experiment(workspace=ws, name='kedro-mlflow-airflow-experiment')
mlflow.set_experiment(experiment.name)
mlflow.start_run(run_name=os.environ.get("AIRFLOW_VAR_KEDRO_RUN_ID"))
mlflow.log_params(run_params)
mlflow.log_param("git_sha", git_sha)

Key takeaways regarding the benefits of Kedro

Kedro has countless advantages for streamlining work between data engineers and data scientists. One of Kedro’s main assets is the concept of separation of concerns since inputs and outputs of pipeline nodes are declared in a yaml configuration file (catalog.yaml). Similarly, credentials are also declared separately in another configuration file (credentials.yaml). The global structure of a Kedro project also makes it an important element in teamwork, as everyone can collaborate even more easily on the same code base.
As also shared in this article, the way Kedro handles parameterization across the entire project for all *.yaml configuration files is helpful to deal with multiple execution environments or credentials.

At IRIS by Argon & Co, for example, this enabled us to deploy a forecasting model first on 2 pilot markets in 4 months (including the whole pipeline setup time), then on 20 markets in just 3 months, and then on a another 20 markets in just 2 months.

This acceleration is largely due to the implementation of a robust pipeline using Kedro.

To go further:

One even more robust alternative is to use Azure Machine Learning compute cluster to pull the latest Docker Image from Azure Container Registry for training.

As we speak, it also seems possible to translate your Kedro pipeline into an Azure Machine Learning Pipeline without much effort using the kedro-azureml plugin making Kedro even more integrated with Azure ML.

Resources

--

--