Re-creating an AWS inference architecture based on Sagemaker and Airflow in Azure

How I copied an AWS MLOPS architecture and transformed it to work in Azure using Airflow and Azure ML.

DataFairy
Towards Data Engineering
7 min readDec 11, 2023

--

Introduction

About two months ago I got asked the question if it is possible to run an inference pipeline using Azure Managed Airflow and Azure Machine Learning Studio. I had never attempted that before and got curious about a possible solution. After a short google search (yes, I still do that ;) I came across a medium post where they build exactly what I was looking for but in AWS. I had a general answer showing that it is possible to build an inference pipeline using Apache Airflow and a ML tool. But can I do the same in Azure? Are the tools available and how well can they talk to each other? Let’s find out.

The original setup

In the original architecture they used various AWS tools:

  • Storage bucket
  • AWS SageMaker
  • SQS Queue
  • AWS Eventbridge
  • ML Compute Instance
  • Airflow installed on an EC2 instance
Image reference below.

They used a Kaggle competition dataset with Tweets data and the HuggingFace BERT model. For my use case I will reuse an existing model I had already registered and the sample requests from a previous article for simplicity. The input data and model shouldn’t make much of a difference.

Azure Setup

For the setup in Azure I will be using the following tools:

  • Managed Airflow/local Airflow instance
  • Blob Storage
  • Azure ML Studio
  • Compute Instance
  • Azure Service Bus

Architecture in Azure

I am reusing a previous Airflow pipeline for inference where I create and clean up the compute instance in Azure ML.

I will be using Pydantic to load, clean and prepare the data. That’s just code I have ready to go. It can be done much simpler but I prefer to show you the solution I would implement myself.

Prerequisites:

  • A previously registered model and an Azure ML resource.
  • New requests are made available to Airflow using a connection to blob storage from the admin panel.

The process:

  • The data is loaded and cleaned in Airflow and then pushed to a specific location on blob storage.
  • While the requests are prepared the compute instance is created.
  • Next the inference job is triggered by Airflow to run on Azure ML.
  • The job takes the registered model and the new request data as inputs.
  • After every input dataset has been scored the job sends the result to Azure Service Bus.
  • Once the job has succeeded Airflow automatically moves on to the next step. No need to track the status of the job for now.
  • In the next step the messages are read from the queue.
  • At the same time all resources created by Airflow are removed again.

Remarks:

In the above example I am using Azure Service Bus to track the job status and to get the prediction results. The Airflow task that is running the job is also tracking the job status. I don’t need the results of Service Bus for that. Nevertheless I thought it was an interesting challenge to use Azure Service Bus. I decided to send the predictions as messages to the queue and to retrieve them in the following step.

Various ways to send messages to Azure Service Bus with Python:

#servicebus.py

from typing import List
import json
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage
from azure.identity.aio import DefaultAzureCredential


async def send(sb_name: str, queue_name: str, type: str, credential: DefaultAzureCredential, messages: List[dict]):
"""Send messages to the Service Bus queue."""
async with ServiceBusClient(
fully_qualified_namespace=f"sb://{sb_name}.servicebus.windows.net",
credential=credential,
logging_enable=True) as servicebus_client:
sender = servicebus_client.get_queue_sender(queue_name=queue_name)
async with sender:
if type == "single":
await send_single_message(sender, messages[0])
elif type == "list":
await send_a_list_of_messages(sender, messages)
elif type == "batch":
await send_batch_message(sender, messages)
else:
raise ValueError("Invalid type")
await credential.close()


async def send_single_message(sender, single_message: dict):
"""Create a Service Bus message and send it to the queue"""
message = ServiceBusMessage(json.dumps(single_message))
await sender.send_messages(message)


async def send_a_list_of_messages(sender, message_list: List[dict]):
"""Create a list of messages and send it to the queue"""
messages = [ServiceBusMessage(json.dumps(message)) for message in message_list]
await sender.send_messages(messages)


async def send_batch_message(sender, message_list: List[dict]):
"""Create a batch of messages"""
async with sender:
batch_message = await sender.create_message_batch()
for message in message_list:
try:
batch_message.add_message(ServiceBusMessage(json.dumps(message)))
except ValueError:
# ServiceBusMessageBatch object reaches max_size.
# New ServiceBusMessageBatch object can be created here to send more data.
break
await sender.send_messages(batch_message)

Azure Service Bus Airflow Connection:

The Airflow tasks for Azure Service Bus do not accept connection id’s for Service Bus. You still need to make a connection in the admin panel.

The pipeline:

import os
import pendulum
from typing import List
import json
from inference.etl import load_data, prepare_requests, clean_request
from inference.request import Request

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from azure.ai.ml import Input, command
from azure.ai.ml.entities import ComputeInstance
from azure.ai.ml.constants import AssetTypes, InputOutputModes
from airflow.models.baseoperator import chain
from airflow.decorators import task
from airflow.operators.python import PythonOperator
from airflow.providers.microsoft.azure.operators.asb import (
AzureServiceBusReceiveMessageOperator,
)
from airflow_provider_azure_machinelearning.operators.machine_learning.job import (
AzureMachineLearningCreateJobOperator,
)
from airflow_provider_azure_machinelearning.operators.machine_learning.compute import (
AzureMachineLearningCreateComputeResourceOperator,
AzureMachineLearningDeleteComputeResourceOperator,
)

with DAG(
dag_id="AML_predict_job",
schedule=None,
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
schedule_interval="0 17 * * *",
tags=["AML predict"],
default_args={
# connection id for the service bus is defined in the dag
"azure_service_bus_conn_id": "azure-sb-conn",
},
) as dag:
# set the connection to the Azure ML workspace and provide the model information
connection_id = "azure-ml-ws-conn"
model_info = "credit-default-model:1"
queue_name = "test_queue"
conda_environment = "environ:version"

def get_list_of_raw_data() -> List[str]:
"""
#### Dummy list of raw data.
"""
return [
'{"input_data": {"columns": [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22],"index": [0, 1],"data": [[20000,2,2,1,24,2,2,-1,-1,-2,-2,3913,3102,689,0,0,0,0,689,0,0,0,0],[10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 10, 9, 8]]}}',
'{"input_data": {"columns": [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22],"index": [0, 1],"data": [[3913,3102,689,0,0,0,0,689,0,0,0,0,20000,2,2,1,24,2,2,-1,-1,-2,-2],[10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 10, 9, 8]]}}',
'{"input_data": {"columns": [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22],"index": [0, 1],"data": [[3913,3102,-689,0,0,0,0,-689,0,0,0,0,20000,2,2,1,-24,2,-2,1,1,2,2],[10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 10, 9, 8]]}}',
]

def load_data_task() -> List[Request]:
"""
#### Extract task
"""
ordered_data = load_data(get_list_of_raw_data())

requests: List[Request] = [Request(**item) for item in ordered_data]

return requests

def clean_data_task(task_instance, **kwargs) -> List[str]:
"""
#### Clean data task
"""
cleaned_requests = []

requests = task_instance.xcom_pull(task_ids="load_data_task")

for request in requests:
cleaned_requests.append(json.dumps(clean_request(request)))

return cleaned_requests

def prepare_requests_task(task_instance, **kwargs):
"""
#### Load task: load cleaned data into database
"""
cleaned_requests = task_instance.xcom_pull(task_ids="clean_data_task")

request_locations = prepare_requests(cleaned_requests)

return request_locations

load_data_task = PythonOperator(task_id="load_data_task", python_callable=load_data_task)
clean_data_task = PythonOperator(task_id="clean_data_task", python_callable=clean_data_task)
prepare_requests_task = PythonOperator(task_id="prepare_requests_task", python_callable=prepare_requests_task)

compute1 = ComputeInstance(
name="compute-321-inference",
size="Standard_D2s_v3", # spec for a compute instance
)

amlc_create_1 = AzureMachineLearningCreateComputeResourceOperator(
task_id="create_compute_instance",
conn_id=connection_id,
compute=compute1,
waiting=True,
)

amlc_delete_1 = AzureMachineLearningDeleteComputeResourceOperator(
task_id="delete_compute_instance",
conn_id=connection_id,
compute_name=compute1.name,
)
# Service principal will need Azure Service Bus Data Owner role
# Fill in the information about the service principal below
env_servicebus = {
"AZURE_CLIENT_SECRET": "",
"AZURE_TENANT_ID": "",
"AZURE_CLIENT_ID": "",
}
# define the path to the /src folder and the main.py file
curr_dir = os.path.dirname(os.path.abspath(__file__))
code_file_path = os.path.join(curr_dir, "/opt/airflow/jobs/predict/src")

predict_command_job = command(
code=code_file_path,
command="python main.py --input_data_folder ${{inputs.input_data_folder}} --input_config_yaml ${{inputs.input_config_yaml}}",
environment=conda_environment,
inputs={
"input_config_yaml": Input(
mode=InputOutputModes.RO_MOUNT,
type=AssetTypes.URI_FILE,
path="azureml://datastores/xxx/paths/credit_defaults_model/config/config.yml",
),
"input_data_folder": Input(
mode=InputOutputModes.RO_MOUNT,
type=AssetTypes.URI_FOLDER,
path="azureml://datastores/xxx/paths/inference_input",
),
},
environment_variables=env_servicebus,
compute=compute1.name,
display_name="predict_from_airflow",
experiment_name="testing-airflow",
description="predict command job",
)
predict_task = AzureMachineLearningCreateJobOperator(
task_id="predict",
job=predict_command_job,
waiting=True,
conn_id=connection_id,
)

check_job_status = AzureServiceBusReceiveMessageOperator(
task_id="receive_message_service_bus_queue",
queue_name=queue_name,
max_message_count=20,
max_wait_time=5,
)

start_task = EmptyOperator(task_id="start")
success_task = EmptyOperator(task_id="success")

start_task >> amlc_create_1 >> predict_task >> [amlc_delete_1, check_job_status] >> success_task
start_task >> load_data_task >> clean_data_task >> prepare_requests_task >> predict_task

Results:

Service Bus Task output:

Airflow logs with service bus messages.

Summary:

In this article we looked at how to create an inference pipeline using MLOPS in Azure. For this purpose we used a setup in AWS as inspiration. We then defined the components used in AWS and recreated a similar infrastructure in Azure using Airflow, Azure ML and Azure Service Bus. We used Pydantic for data validation and checked our predictions by reading the message from the service bus queue.

Github:

datafairy-azure/inference: Code for inference pipelines un airflow (github.com)

AWS Setup:

Automate and streamline our ML inference pipeline with SageMaker and Airflow | MLearning.ai (medium.com)

Note

I am still experimenting a lot with Apache Airflow and Azure ML and I would really appreciate your feedback. I am also very much interested in other approaches, questions or ideas on how to improve the workflows I show here. Let me know what you think!

If you found this article useful, please follow me.

--

--

DataFairy
Towards Data Engineering

Senior Data Engineer, Azure Warrior, PhD in Theoretical Physics, The Netherlands. I write about Data Engineering, Machine Learning and DevOps on Azure.