Calling an Azure ML endpoint using Apache Airflow and the Azure ML provider package

How to score your Machine Learning model that is deployed to an Azure ML endpoint using Apache Airflow and an Azure ML job.

DataFairy
Towards Data Engineering
5 min readNov 30, 2023

--

What this article is about

In this article we will create another pipeline to call an Azure ML endpoint using httpx and Apache Airflow. This time we will call the endpoint from Azure ML itself. We will look at the advantages of this method and how to get an Azure ML job started from Apache Airflow.

Checkout my previous take on this topic:

Running predictions on AzureML

Using Azure ML has a few advantages:

  • Airflow will be used as intended and only orchestrates the process
  • Setting up a custom Python environment is straightforward
  • Data is mounted in Azure ML (read-write goes much faster)
  • Authentication is much simpler

Prerequisites:

  • Datastore is mounted with sufficient access rights (SAS token, Access key). use the key if you get a stream authentication error
  • Service principal to create Azure ML connection in Airflow
  • Azure ML endpoint with registered model deployed and accepting traffic
  • Test data

The Airflow-Azure ML pipeline

The following pipeline consists of these steps:

  • Start
  • Create compute instance: creates a new compute instance using the Airflow Azure ML connection
  • predict: Run the predict job on Azure ML
  • Delete compute instance: deletes the compute instance using the Airflow Azure ML connection
  • Success
import os
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from azure.ai.ml import Input, command
from azure.ai.ml.entities import AmlCompute, ComputeInstance
from azure.ai.ml.constants import AssetTypes, InputOutputModes

from airflow_provider_azure_machinelearning.operators.machine_learning.job import (
AzureMachineLearningCreateJobOperator,
)
from airflow_provider_azure_machinelearning.operators.machine_learning.compute import (
AzureMachineLearningCreateComputeResourceOperator,
AzureMachineLearningDeleteComputeResourceOperator,
)

# Pipeline definition
with DAG(
dag_id="AML_predict_job",
schedule=None,
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
schedule_interval="0 17 * * *",
tags=["AML predict"],
) as dag:
# Create an Azure ML Airflow connection named azure-ml-ws-conn
connection_id = "azure-ml-ws-conn"

# Define the compute instance
compute1 = ComputeInstance(
name="af-test-instance",
size="Standard_D2s_v3", # spec for a compute instance
)

# Task to create the compute instance
amlc_create_1 = AzureMachineLearningCreateComputeResourceOperator(
task_id="create_compute_instance",
conn_id=connection_id,
compute=compute1,
waiting=True,
)
# Task to delete the compute instance
amlc_delete_1 = AzureMachineLearningDeleteComputeResourceOperator(
task_id="delete_compute_instance",
conn_id=connection_id,
compute_name=compute1.name,
)

# Defining the location of the source code
curr_dir = os.path.dirname(os.path.abspath(__file__))
code_file_path = os.path.join(curr_dir, "/opt/airflow/jobs/predict/src")

# Defintion of the predict job with input parameters
predict_command_job = command(
code=code_file_path,
command="python main.py --input_data_folder ${{inputs.input_data_folder}} --input_config_yaml ${{inputs.input_config_yaml}}",
environment="credit-default-env:version_number",
inputs={
"input_config_yaml": Input(
mode=InputOutputModes.RO_MOUNT,
type=AssetTypes.URI_FILE,
# path to the config file on the mounted blob storage
path="azureml://datastores/xxx/paths/credit_defaults_model/config/config.yml",
),
"input_data_folder": Input(
mode=InputOutputModes.RO_MOUNT,
type=AssetTypes.URI_Folder,
# path to the sample request folder on the mounted blob storage
path="azureml://datastores/xxx/paths/credit_defaults_model/data",
),
},
compute=compute1.name,
display_name="predict_from_airflow",
experiment_name="testing-airflow",
description="predict command job",
)

# Job creation task
predict_task = AzureMachineLearningCreateJobOperator(
task_id="predict",
job=predict_command_job,
waiting=True,
conn_id=connection_id,
)

start_task = EmptyOperator(task_id="start")
success_task = EmptyOperator(task_id="success")

start_task >> amlc_create_1 >> predict_task >> amlc_delete_1 >> success_task

This example depends on the airflow-provider-azure-machinelearning · PyPI package. This is still an early version and bugs can be expected.

The job definition:

Besides the job definition the other steps in the pipeline are pretty straightforward.

# Defining the location of the source code
curr_dir = os.path.dirname(os.path.abspath(__file__))
code_file_path = os.path.join(curr_dir, "/opt/airflow/jobs/predict/src")

# Defintion of the predict job with input parameters
predict_command_job = command(
code=code_file_path,
command="python main.py --endpoint ${{inputs.endpoint}} --input_config_yaml ${{inputs.input_config_yaml}}",
environment="credit-default-env:version_number",
inputs={
"input_config_yaml": Input(
mode=InputOutputModes.RO_MOUNT,
type=AssetTypes.URI_FILE,
path="azureml://datastores/xxx/paths/credit_defaults_model/config/config.yml",
),
"input_data_folder": Input(
mode=InputOutputModes.RO_MOUNT,
type=AssetTypes.URI_Folder,
path="azureml://datastores/xxx/paths/credit_defaults_model/data",
)
},
compute=compute1.name,
display_name="predict_from_airflow",
experiment_name="testing-airflow",
description="predict command job",
)

The Airflow job definition is very much similar to the job definition in Azure ML. One difference is that we have to make sure Airflow can find the source files we want to provide to run the job (jobs/predict/src).

curr_dir = os.path.dirname(os.path.abspath(__file__))
code_file_path = os.path.join(curr_dir, "/opt/airflow/jobs/predict/src")

They should have been made available to Airflow in the Dockerfile:

FROM quay.io/astronomer/astro-runtime:9.1.0

COPY requirements.txt /opt/airflow/requirements.txt

COPY dags/jobs /opt/airflow/jobs

Authorization:

According to the Azure github documentation you need a service principal with contributor rights on the resource group level and, digging deeper, also Storage Blob Data Reader on the storage account. I found that not sufficient. I got a StreamAccessAuthentication error with this setup. The solution for me was to mount the blob storage on Azure ML using an access key and not just a SAS token.

Main.py

import argparse
import sys, os

sys.path.append(os.path.dirname(os.path.realpath(__file__)))
from inference.utils import (
define_headers,
load_config,
call_endpoint_with_requests,
parse_requests,
)


def parse_args():
"""Parse input arguments"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_config_yaml", type=str, help="config location on azure"
)
parser.add_argument(
"--input_data_folder", type=str, help="data location on azure"
)
return parser.parse_args()


def main(args):
cfg = load_config(args.input_config_yaml)
headers = define_headers(cfg)

# Define the location of the sample data
paths = [
args.input_data_folder + "sample_request_1.json",
args.input_data_folder + "sample_request_2.json",
]

# Loading in the requests from json files
request_items = parse_requests(paths)

return call_endpoint_with_requests(
request_items, headers, cfg, "httpx"
)

if __name__ == "__main__":
args = parse_args()

response = main(args)
print(response)

Github Link:

datafairy-azure/inference: Code for inference pipelines un airflow (github.com)

Summary

In this article we looked at how to run Azure ML jobs using Apache Airflow. We created an Airflow pipeline that creates and deletes compute instances in Azure ML and runs jobs. We accessed data from blob storage mounted on Azure ML. Finally we ran a predict job by calling an Azure ML endpoint with sample requests.

Note

I am still experimenting a lot with Apache Airflow and Azure ML and I would really appreciate your feedback. I am also very much interested in other approaches, questions or ideas on how to improve the workflows I show here. Let me know what you think!

If you found this article useful, please follow me.

--

--

DataFairy
Towards Data Engineering

Senior Data Engineer, Azure Warrior, PhD in Theoretical Physics, The Netherlands. I write about Data Engineering, Machine Learning and DevOps on Azure.