Scheduling & Deploying ML Pipeline Endpoints in Azure Machine Learning

Ashley Peterson
Slalom Data & AI

--

By Ashley Peterson and Emily Huskins

In our previous post, we created a model training pipeline to classify AirBnb listings based on the dwelling attributes. After creating a Machine Learning (ML) Pipeline in Azure, the next step is to deploy the pipeline. Using the model training pipeline, we wanted to set a retraining schedule and create a second pipeline to generate predictions when a new file was added to Blob Storage.

First, we created the second pipeline, the model prediction pipeline, which would call the model we previously trained. Next, we published the two pipelines to REST endpoints. Then, we created schedules for each pipeline — one time-based and one change-based.

Creating the Model Prediction Pipeline

The model prediction pipeline has two steps. The first is the data preparation step, which mirrors the data preparation step from the model training pipeline to ensure the same features are created for predictions. The second step calls the model saved in the workspace and makes predictions on the new data. We are storing and reading our data directly from Azure Blob Storage. Within an Workspace, you can register your data and read datasets from the Workspace instead of connecting directly to a storage service.

Step 1: Data Prep

We used our data_prep script from the training pipeline as a starting point. The challenge we ran into with the AirBnb data was that some of the categorical features had numerous values. For example, property type had 28 possible categories. Not every category for property type was represented in the new data file, so we needed to update our script to preform One Hot Encoding to include all the possible categorical values.

To allow for scheduling the pipeline later on, we created a DataPath parameter as a dynamic input to the pipeline. We will explain the DataPath parameter in the change-based scheduling section. For testing the prediction pipeline, the DataPath parameter is a static input that passes the path of our CSV file in Blob Storage.

data_path_input = (datapath1_pipeline_param,
DataPathComputeBinding(mode = ‘mount’))
data_prep_step = PythonScriptStep(name = 'Data Preparation',
script_name = 'data_prep_predictions.py',arguments = ["--input_data_path", data_path_input,"--string_parameter", string_pipeline_param,"--output_data", cleansed_data],inputs = [data_path_input], outputs = [cleansed_data],compute_target = compute_target,runconfig = run_config,allow_reuse = True)

Step 2: Model Predictions

Using the model registered to the Workspace from the AutoML Training Pipeline, we made predictions on a new set of data and output the results. The code below pulls the model registered in the Workspace and returns the predictions to a Blob container. The output_path is an input to this step and defines the path in Blob Storage where the results are stored.

reg_model_path = Model.get_model_path(args.model_name)best_model = joblib.load(reg_model_path)
predictions = best_model.predict(x)
output_path = os.path.join(args.output_dir, 'predictions_' +
str(datetime.datetime.now().date()))
airbnb.to_csv(output_path + '.csv', index=False)

Publishing the Pipelines to REST endpoints

After creating the model prediction pipeline, we leveraged the Python SDK to create a REST endpoint. Within the Workspace, you can create endpoints for either a model or a pipeline depending on how you would like to use the output. Since we were not creating a web app to make real-time predictions, we chose to create a pipeline endpoint. You also have the option to enable versioning for your pipeline, so that the endpoint remains the same even if you update the pipeline. Creating the endpoint allows us to run the pipeline outside of the Jupyter Notebook that was used to create it.

published_prediction = pipeline.publish(
name =
'airbnb_automl_classification_predictions',
description = 'airbnb automl classification
model prediction pipeline',
version = '2')

Calling the Endpoints: On-Demand and Scheduling

Once a pipeline is published, you can call that pipeline to execute a run. On-demand and Scheduled are the two options for calling the endpoints. With On-Demand the pipeline endpoint is active and can be called whenever you would like to run it. Scheduled pipelines have options to run your pipelines on a set schedule or in response to a change in the data source.

On-Demand

To test our endpoint, we used a new notebook to call the pipeline from the Workspace and submit a run. The code to run a pipeline could be used in other applications to trigger a run and return results. Using the .submit() function will run the pipeline on-demand when that line is executed.

experiment_name2 = 'endpoint-run-airbnb-prediction'
experiment2 = Experiment(ws, experiment_name2)
prediction_pipeline = PublishedPipeline.get(ws, id = pipeline.id)
run_prediction_pipeline = experiment2.submit(prediction_pipeline)

Scheduling

Instead of having to do on-demand runs of the pipelines, we explored the options for using Scheduling. Azure has two methods for scheduling: time-based and change-based. We chose to set our training pipeline on a time-based schedule and our model prediction pipeline on a change-based schedule.

Note: If there is a need to deactivate your pipeline, you must first cancel the schedule associated with that endpoint.

Time-Based Schedule

Creating a time schedule is a few lines of code. After you have your published pipeline, you can attach a schedule to the pipeline’s endpoint by using the pipeline’s ID. We set our training pipeline to a time-based schedule to retrain the model when a month’s worth of data had been collected.

recurrence = ScheduleRecurrence(
frequency="Month",
interval=1,
start_time="2020-01-01T09:00:00")
recurring_schedule = Schedule.create(ws,
name="airbnb_training_pipeline_schedule",
description="Schedule Training Pipeline
to run on first day of every month",
pipeline_id=training_pipeline_id,
experiment_name=experiment.name,
recurrence=recurrence)

Change-based Schedule

To enable scheduling based on a change in our Blob container, we created a DataPath object and PipelineParameter. The DataPath will update to the new uploaded file’s path which would be the input to the prediction pipeline. Along with the DataPath object, a DataPathComputeBinding needs to be defined to determine how the data will be consumed during a pipeline run. We set the mode to ‘mount’ to allow access to the data in Blob rather than downloading the dataset. We recommend using ‘mount’ when you have large amounts of data.

from azureml.data.datapath import DataPath, DataPathComputeBinding
from azureml.pipeline.core import PipelineParameter
data_path = DataPath(datastore = dstore, path_on_datastore =
'listings_subset.csv')
datapath1_pipeline_param = PipelineParameter(name =
'input_data_path',
default_value = data_path)
data_path_input = (datapath1_pipeline_param,
DataPathComputeBinding(mode = 'mount'))
string_pipeline_param = PipelineParameter(name='input_string',
default_value = 'input_data.csv')

To enable a change-based Schedule, we create a schedule as we did for the time-based schedule with additional parameters. The datastore, path_on_datastore, polling _interval, and data_path_parameter_name are set. These parameters define where to watch for changes to trigger a pipeline run. The datastore and path_on_datastore define where you would like to watch for changes. Our datastore is our Blob container and we want to watch for any changes in the container so the path is set to ‘/’. Setting a change-based schedule will only monitor the path given — it will not trigger based off changes in the subfolders. The polling_interval is the time in minutes that you want to check for changes in the data source. The data_path_parameter_name is the name of the object defined above in the datapath1_pipeline_param. Our prediction pipeline is set to check the airbnb-data container hourly for new data.

datastore = Datastore(workspace=ws, name="airbnb")
pipeline_id = pipeline.id
experiment_name3 = "blob_trigger_airbnb_predictions"
reactive_schedule = Schedule.create(ws,
name="Blob_trigger_schedule",
description="Based on input file change
in airbnb-data container.",
pipeline_id=pipeline_id,
experiment_name=experiment_name3,
datastore=datastore,
path_on_datastore = '/',
polling_interval = 60,
data_path_parameter_name=
'input_data_path')

After setting up the schedule, you can check the status using the code below.

schedules = Schedule.list(ws)for schedule in schedules:
x = Schedule.get(ws, schedule.id)
print("Schedule id {} status {}".format(x.id, x.status))

Both pipelines are now set to run automatically to provide predictions on new data and to retrain the model monthly.

Azure Machine Learning provides an easy way to create REST endpoints to deploy ML pipelines. Once you have created pipelines for model training and model predictions, you can determine the best deployment option to enable your model or pipeline to be consumed. With the Scheduler, you can set your pipelines to run weekly or as you received data updates. If you are looking to build a web app, the on-demand call of the pipeline would be a good choice to get predictions as needed. Once the output is landed in a datastore, PowerBI dashboards can be created to monitor and explore the results.

Ashley Peterson (L) is a Consultant in Slalom’s Data and Analytics practice. Emily Huskins (R) is an Analyst in Slalom’s Data and Analytics practice.

Slalom is a modern consulting firm focused on strategy, technology, and business transformation.

--

--