ML Pipelines in Azure Machine Learning the right way

A code example to get you up-and-running quickly

Coussement Bruno
datamindedbe
8 min readJun 7, 2021

--

Photo by Florian Wächter on Unsplash

Note: this article was written at the time that only the Python SDK v1 was available. V2 is released, and simplified the API.

The official Azure Machine Learning Studio documentation, the Python SDK reference and the notebook examples are often out-of-date, or don’t cover all important aspects, or don’t provide a compelling end-to-end example. This guide is an attempt to cover the necessary basics, hopefully accelerating you in building a machine learning pipeline on Azure.

Azure ML Studio logo

Azure ML Studio

Azure ML Studio (AML) is an Azure service for data scientists to build, train and deploy models. Data engineers on the other hand can use it as a starting point to industrialise ML models.

According to the MLOps best-practices from Google or Microsoft, you actually want to build a pipeline of defined steps (data preparation, hyper-parameter tuning, model training, model evaluation) instead of merely developing “a model”. Although this approach requires more effort than adhering to an ad-hoc notebook flow, some clear benefits make up for it:

  • enables rapid structured end-to-end experimentation,
  • separation of concerns,
  • easier to share a single or multiple component across projects,
  • more possibilities for automation.

Getting started

Before diving into code and configurations, make sure you have checked-off the pre-requisites. This article will not cover it, because the links should enable you to do it.

Pre-requisites

  • Have an account on Azure.
  • Created an Azure ML Workspace.
  • Created a basic Azure ML Compute cluster.
    Three nodes of a cheap instance type (e.g. Standard_DS2_v2) is more than enough for this tutorial.
  • Created an Azure ML Datastore for the resource your data resides on (typically a Container in a Storage Account).
  • Ran pip install azureml-core azureml-pipelinein your development environment

We are going to build a two step pipeline:

  1. data preparation step,
  2. model training step.

This is to keep it simple, but that you can at least understand how to feed the output of a step into another.

All the code can be found here. In order to make it run, you will need to add your own workspace, datastore and compute config.

#1. Create a reference to the Azure ML Workspace

The Workspace is the fundamental Azure ML resource. It is tied to a subscription and resource group. You would typically have a single workspace per project.

from azureml.core import Workspacews = Workspace.get(
name="my_workspace",
subscription_id= "111",
resource_group= "my_resource_group"
)

Note, that you can also load configurations from a file using Workspace.from_config(). Save it in <project-root>/.azureml/config.json.

#2. Create a reference to the Azure ML Datastore

A datastore is an AML-specific component that abstracts away the Azure resource the data is stored on. It allows for cross-subscription or cross-resource group data access on Azure (useful in enterprise context). The main advantage is that takes care of authentication for you (after initial setup, minimal key management is required).

You don’t necessarily need it. If you don’t, be ready to pollute your code with authentication/authorization snippets. More code = more liability!

from azureml.core.datastore import Datastoredatastore = Datastore.get(
workspace=ws,
datastore_name="my_datastore"
)

#3. Register input files saved on blob store as an Azure ML Dataset

Azure ML Datasets allow you to for example register files in a blob store as a File (images, texts, sound recordings,…) or Tabular (parquet, csv, json,…) dataset. It allows you to version, monitor, profile and quick-preview it.

I do recommend to use it, even though you don’t absolutely need it. You could interact to Blob store directly from your pipeline step. The con is that you’ll miss some built-in features (dataset monitor for data drift, etc). In the end, Azure ML Datasets basically is a pointer to your files, and keeps some metadata on it.

Tip: If registering large tables (>100M rows and/or >100 cols) from partitioned parquet files as Tabular Dataset gives troubles, then use File Dataset type instead. The latter type doesn’t perform a data validation step upon creation.

Registering input data as a AML Dataset happens typically as the last step of an ingestion process. For simplicity, I assume you have a Pandas DataFrame that you would like to register as a Tabular Dataset called “my_input_dataset”.

The basic pattern is to save it locally, upload the files to a datastore and register is.

from azureml.core import Datasetdf = _ # assumption: custom code here to obtain dflocal_path = "path/to/local/parquet/files"
target_path = "path/to/upload/on/blob"
df.to_parquet(local_path, index=False)data_reference = datastore.upload(src_dir=local_path, target_path=target_path, overwrite=True)dataset = Dataset.Tabular.from_parquet_files(path=data_reference)dataset.register(workspace=ws, name="my_raw_dataset")

Side note, at the time of writing, I discovered a function in preview which avoids you to manually save, upload and parse your dataframe.

Dataset.Tabular.register_pandas_dataframe(dataframe=df, target=target_path, name="my_raw_dataset")

#4. Create Python modules, but separate Azure ML specific code from the rest

Adding all AML specific code to an azureml folder avoids polluting your non-cloud-vendor code. This renders the general and the azure-specific part more fit for re-use.

├── src/
└── my_awesome_project
├── data
│ └── clean_input_data.py
├── ml
│ └── model.py
└── azureml
├── aml_clean_data.py
├── aml_train_model.py
├── register_dataframe_as_dataset.py
└── create_and_trigger_pipeline.py

The aml_* Python scripts will be executed by the AML compute cluster. The other two, serve as scripts to trigger remote execution from your local laptop.

The remotely executed script aml_clean_data.py reads an input Azure ML dataset, cleans it, and saves it to a mounted path. Registering the output data happens automatically if the script described here ran successfully (see step 5).

A Run represents a single trial of and experiment. Runs are used to monitor the asynchronous execution of a trial, log metrics and store output of the trial, and to analyze results and access artifacts generated by the trial.

import os
from azureml.core import Run
from my_awesome_project.data.clean_input_data import clean
run = Run.get_context()
raw_df = run.input_datasets["my_raw_dataset"].to_pandas_dataframe()

clean_df = clean(raw_df)
mounted_output_dir = run.output_datasets["my_clean_dataset"]os.makedirs(os.path.dirname(mounted_output_dir), exist_ok=True)
clean_df.to_parquet(mounted_output_dir)

The other remotely executed aml_train_model.py reads a cleaned dataset, trains a model, and registers it. Note, that everything in /outputs and /logs will be uploaded to the run. Behind the scenes, the trained model will be saved on a blob store managed by Azure ML.

To pass model parameters, an easy option is to use an ArgumentParser to parse command line arguments that will be set by Azure ML when running this script.

from argparse import ArgumentParser
from azureml.core import Run
from my_awesome_project.ml.model import train_model
run = Run.get_context()
ap = ArgumentParser()
ap.add_argument("--epochs")
args = ap.parse_args()

clean_df = run.input_datasets["my_clean_dataset"].to_pandas_dataframe()
trained_model = train_model(data=clean_df, epochs=args.epochs)trained_model.save("./outputs/model") # /outputs is importantrun.register_model(name="my_model", path="outputs/model")

#5. Define inputs as dataset named-references, and outputs as OutputFileDatasetConfig

This is the most confusing part of getting started if you go through notebook examples. You will find so many classes that basically do the same. Their class-structure feels messy and unpythonic. To convince yourself of that, take a look at all the possible input types a PythonScriptStep can accept.

<azureml.pipeline.core.graph.InputPortBinding,
azureml.data.data_reference.DataReference,
azureml.pipeline.core.PortDataReference,
azureml.pipeline.core.builder.PipelineData,
azureml.pipeline.core.pipeline_output_dataset.PipelineOutputFileDataset,
azureml.pipeline.core.pipeline_output_dataset.PipelineOutputTabularDataset,
azureml.data.dataset_consumption_config.DatasetConsumptionConfig>

Or the output types:

<azureml.pipeline.core.builder.PipelineData,
azureml.data.output_dataset_config.OutputDatasetConfig,
azureml.pipeline.core.pipeline_output_dataset.PipelineOutputFileDataset,
azureml.pipeline.core.pipeline_output_dataset.PipelineOutputTabularDataset,
azureml.pipeline.core.graph.OutputPortBinding>

Indeed, go figure it out 😉.

An easy way is to use Dataset.get_by_name to create an input data reference. Output or intermediary data is best reference by creating an OutputFileDatasetConfig object.

For example, reference the raw and clean dataset in create_and_trigger_pipeline.py are done as follows:

from azureml.core import Dataset
from azureml.data.output_dataset_config import OutputFileDatasetConfig
input_data = Dataset.get_by_name(workspace=ws, name="my_raw_data")clean_data = (
OutputFileDatasetConfig(
name="my_clean_data",
destination=(
datastore,
"path/on/blob/to/write/clean/data/to")
).as_upload(overwrite=True)
.read_parquet_files() # To promote File to Tabular Dataset
.register_on_complete(name="my_clean_data")
)

Note, if you are working with tabular data, and you wish to work with Tabular datasets from the output, then you need to convert the default File Dataset to a Tabular one. This is done by calling read_parquet_files() on the OutputFileDatasetConfig .

#6. Create and configure PythonScriptStep

In the previous step we defined a script that we would like to run on a compute platform managed by Azure ML. Again, there are multiple options here. For all Python scenarios, I recommend using the PythonScriptStep . Except if you need pyspark, then consider using the DatabricksStep .

For the runconfig parameter, look at the documentation of RunConfiguration in order to set dependencies (and others) correctly for your project.

An example of step definitions in create_and_trigger_pipeline.py could look like:

from azureml.core.compute import ComputeTarget
from azureml.core.runconfig import RunConfiguration
from azureml.pipeline.core.pipeline import Pipeline
from azureml.pipeline.steps import PythonScriptStep
from pathlib import Path
from my_awesome_project.azureml import aml_clean_data, aml_train_model
import my_awesome_project
# dataset references here (see step 5)src_dir = Path(my_awesome_project.__file__).parent.parent
clean_mdl_path = Path(aml_clean_data.__file__).relative_to(src_dir)
train_mdl_path = Path(aml_train_model.__file__).relative_to(src_dir)
clean_step = (
PythonScriptStep(
name="clean data",
script_name=str(clean_mdl_path),
source_directory=src_dir,
runconfig=RunConfiguration(),
inputs=[input_data
.as_named_input("my_raw_data")
.as_mount()],
outputs=[clean_data],
compute_target=ComputeTarget(
workspace=ws,
name="small_cluster"),
allow_reuse=True,
)
train_step = (
PythonScriptStep(
name="train_model",
script_name=str(train_mdl_path),
source_directory=src_dir,
runconfig=RunConfiguration(),
arguments=["--epochs", "5"],
inputs=[clean_data.as_input()],
outputs=[],
compute_target=ComputeTarget(
workspace=ws,
name="small_cluster"),
allow_reuse=True,
)

#7. Submit your pipeline to an Azure ML Experiment and trigger a run

Finally to submit your pipeline your pipeline to an experiment, and trigger it, simply.

To do this in create_and_trigger_pipeline.py , simply

from azureml.core.experiment import Experiment
from azureml.pipeline.core.pipeline import Pipeline
exp = Experiment(workspace=ws, name="my_experiment")
pipeline = Pipeline(ws, steps=[clean_step, train_step])
run = pipeline.submit(experiment_name=exp.name)
run.wait_for_completion(raise_on_error=True)

Visit your workspace through the Azure portal, and you should be seeing a running pipeline.

Next steps? More steps!

After getting a basic flow up-and-running, you should be able to add more steps in a similar way. Think about data validation, model validation, more data cleaning, etc.

Also think about abstracting away all the boilerplate code that I left here explicitly for educational purposes.

Tip: Log any performance metric, tables, visualisations to the experiment run. For example, let’s say you’re predicting bicycle-rent demand, and business wants you to focus on a few renting points around train stations. In order to easily track progress on that, you could log the mean absolute error of the predictions made around those high-interest points.
Look at the documentation of Run for reference.

mae = mae_around_train_stations(model, X_test, y_test)run.log(name="mae_around_stations", value=mae, description="MAE predictions around train stations")

Industrialisation

Once you got a basic pipeline working, and you get consistent performance, it is time to think about industrialisation. For small use cases, it is perfectly fine to just promote this pipeline to a production context. For larger projects, think about how to integrate this flow with stable engineering flows. Avoid creating a pipeline jungle introducing yet another pipelining tool.

In order to increase stability, observability; think about the following:

  • integrate with a central orchestrator of choice (Airflow, Azure Data Factory, etc),
  • setup a CI/CD flow,
  • setup data and model monitoring correctly.

Azure obviously has an offering for this. If your organisation has the “go full Azure managed services”-strategy, then go ahead. Else, substitute each mentioned service by a cloud-agnostic one (for example Azure Data Factory by Airflow). Do the math about the total cost of managing these solutions.

Conclusion

We proposed a basic example on how to use Azure ML pipelines in batch context. It avoids the use of the deprecated Azure ML Python SDK components.

A proof-of-concept in your organisation?

Contact Data Minded through https://www.linkedin.com/company/data-minded/ or https://www.dataminded.be/.

Acknowledgements

I would like to thank Data Minded for having offered me learning opportunities at different clients.

--

--