Async Streaming with Azure OpenAI and Python Fast API

Rukaya Ogazi-Khan
Version 1
Published in
5 min readApr 30, 2024

Introduction

User Requirement: I want to receive responses from OpenAI in real-time, with each word returned as soon as it’s available.

Motivations: Returning responses in real-time mimics human-like interaction. This makes the conversation feel more natural and improves the user experience.

User Journey

Solution Tech Stack

Underlying Technologies

Server-Sent Events (SSE)

SSE is a technology that allows a client to create a persistent connection with a server so that data can be pushed over HTTP in real-time.

In the context of our application, using SSE allows our Python Application to send each word of the response to the user as soon as it’s generated. Having this real-time response provides a more natural and engaging conversational experience.

To learn more about SSE I would recommend watching Server-Sent Events Crash Course

Asynchronous

Asynchronous programming allows tasks to be executed independently. So that the completion of task A does not block task B from being executed.

In the context of our application, this allows our Python Application to send each word of the reponse to the user while still retrieveing the next part of the response from Azure OpenAI.

To learn more about Asynchronous Programming I would recommend watching Synchronous vs Asynchronous Applications (Explained by Example)

FastAPI

FastAPI is a web framework used for building APIs with Python.

In the context of our application, we use it to create an API that our users can send prompts to. The API then passes these prompts to Azure OpenAI, which streams each word of the response back to the user as it is generated.

To learn more about Asynchronous Programming I would recommend watching FastAPI Introduction — Build Your First Web App — Python Tutorial

The Infrastructure

The Terraform code deploys a Resource Group with an Azure OpenAI instance in it and a model attached to the instance.

locals {
open_ai_instance_models = flatten([
for instance in var.open_ai_instances : [
for model in instance.models : {
instance_name = instance.name
model_name = model.name
model_version = model.version
}
]
])
}

resource "azurerm_resource_group" "resource_group" {
name = var.resource_group_name
location = var.location
}

resource "azurerm_cognitive_account" "ai_services" {
for_each = { for open_ai_instance in var.open_ai_instances : open_ai_instance.name => open_ai_instance }

name = each.value.name
location = each.value.region
resource_group_name = azurerm_resource_group.resource_group.name
kind = "OpenAI"
sku_name = each.value.sku
custom_subdomain_name = each.value.custom_subdomain_name
public_network_access_enabled = true

}

resource "azurerm_cognitive_deployment" "model" {
for_each = { for open_ai_instance_model in local.open_ai_instance_models : open_ai_instance_model.model_name => open_ai_instance_model }

name = each.value.model_name
cognitive_account_id = azurerm_cognitive_account.ai_services[each.value.instance_name].id

model {
format = "OpenAI"
name = each.value.model_name
version = each.value.model_version
}

scale {
type = "Standard"
}
}

An example input to this deployment is below. You should update the vars.tfvars file with your desired values.

location            = "uksouth"
resource_group_name = "azure-open-ai-rg"

open_ai_instances = [
{
name = "dev-openai-1"
region = "uksouth"
sku = "S0"
custom_subdomain_name = "ai-service-dev-openai-1"
models = [
{
name = "gpt-35-turbo"
version = "0301"
},
]
},
]

From the Azure Portal grab the Endpoint, Keys and the Deployment Model name.

These values are also returned by your terraform output

The Code

Application

The first part of the application code:

  1. Initialises your FastAPI application
  2. Sets up authentication to Azure OpenAI
  3. Creates a Prompt model that defines the type of input that can be recieve from the user. In this case — text.

The app is now set up to receive input prompts and interact with Azure OpenAI.

import os
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import openai
import uvicorn

# App
app = FastAPI()

# Azure OpenAI Authentication
endpoint = os.environ["AZURE_OPEN_AI_ENDPOINT"]
api_key = os.environ["AZURE_OPEN_AI_API_KEY"]

client = openai.AsyncAzureOpenAI(
azure_endpoint=endpoint,
api_key=api_key,
api_version="2023-09-01-preview"
)

# Azure OpenAI Model Configuration
deployment = os.environ["AZURE_OPEN_AI_DEPLOYMENT_MODEL"]
temperature = 0.7

# Prompt
class Prompt(BaseModel):
input: str

The second part of the application code sets up the API that streams Azure OpenAI responses back to the user. Here:

  1. The stream function takes the users input (i.e. prompt) and makes an asynchronous call to Azure Open AI to get a response.
  2. The stream_processor function asynchronously processes the response from Azure OpenAI.
  3. The stream function response is of type “StreamingResponse” which allows SSE technologies to stream the response back word by word.
# Generate Stream
async def stream_processor(response):
async for chunk in response:
if len(chunk.choices) > 0:
delta = chunk.choices[0].delta
if delta.content:
yield delta.content


# API Endpoint
@app.post("/stream")
async def stream(prompt: Prompt):
azure_open_ai_response = await client.chat.completions.create(
model=deployment,
temperature=temperature,
messages=[{"role": "user", "content": prompt.input}],
stream=True
)

return StreamingResponse(stream_processor(azure_open_ai_response), media_type="text/event-stream")

The final part of the code runs the application making it available for users to call.

if __name__ == "__main__":
uvicorn.run("main:app", port=8000)

Testing

To test the application you can run the below curl command. It is important to note:

  1. As the application needs to be running while you execute this curl command, you should run the command in a seperate terminal
  2. You can change the input prompt to the API
  3. You may have to change the application IP address depending on the default IP address the application runs on with your machine.
curl -L \
-H "Accept: application/json" \
-H "Connection: close" \
-H "Content-type: application/json" \
-X POST -d '{"input": "write a random 100 word paragraph"}' \
http://127.0.0.1:8000/stream --no-buffer \
--verbose

Automation

The creation of the infrastructure and application can be automated in several ways. In the following GitHub Repo I use a Makefile for automation. You can follow the README in this repo to create the infrastructure and run this application yourself.

Summary

Putting it all together, we now have a solution that fulfills our user requirement — our API retrieves response from Azure OpenAI in real-time and returns each word to the user as soon as it’s available. This allows your application to mimic human responses and ultimately enhances the user experience.

To build on this solution you can consider integrating the terraform code with your Azure Landing Zone and work towards productionising your application by creating a front-end (e.g. a chatbot) and setting up automated deployment pipelines.

About the Author:

Rukaya Ogazi-Khan is an Azure architect at Version 1.

--

--