Stream OpenAI with FastAPI and Consuming it with React.js

Huan Xu
6 min readMar 29, 2023

--

Problem Statement

Display an OpenAI stream in a React.js frontend application using FastAPI as the backend.

Smooth 👌

In this blog post, we will focus on serving an OpenAI stream using FastAPI as a backend, with the intention of displaying the stream in a React.js frontend, similar to ChatGPT’s procedural fashion.

Stream OpenAI with FastAPI, and render it with React.js

Step 1: the stream parameter from OpenAI Documentation

OpenAI Chat Completion API documentation provides details on the stream parameter. When stream is set to True, the openai.ChatCompletion.createfunction will return partial message deltas instead of a regular message response in the returned dictionary. These partial responses are sent as data-only server-sent events (SSE) which can be read more on the Mozilla Developer page. If you’re using React.js on the client side and FastAPI on the server side, you can easily implement real-time updates using SSE.

Server-Sent Events is an HTTP standard that enables a client application to automatically receive updates or event streams from the server once an initial connection has been established. It’s a server push technology that allows client apps to receive data transmission from the server via an HTTP connection and describes how servers can stream data to the client once an initial connection has been established. Server-sent events (SSEs) are unidirectional in nature i.e., only the server can push updates to client. SSE is commonly used to send automatic updates or continuous data streams to a browser client.

The following is the example code from OpenAI Cookbook about creating a Chat Completion request with stream=True:

import openai

# A ChatCompletion request
response = openai.ChatCompletion.create(
model='gpt-3.5-turbo',
messages=[
{'role': 'user', 'content': "What's 1+1? Answer in one word."}
],
temperature=0,
stream=True # this time, we set stream=True
)

for chunk in response:
print(chunk)

The output of the streaming responses displays a delta field instead of a message field when stream=False:

{
"choices": [
{
"delta": {
"content": "\n\n"
}
}
],
...
}

These delta fields can hold different types of values:

  • A role token, e.g., {"role": "assistant"}
  • A content token, e.g., {"content": "\n\n"}
  • An empty object, e.g., {}, when the stream is over

The content token will be the main focus in our actual implementation, as it suggests there are new content (delta) from the OpenAI stream data. This code example demonstrates how to work with the OpenAI stream directly. Now, we want to integrate it with FastAPI.

Step 2: Combine Server-Sent Events with FastAPI

In this step, we will create a FastAPI backend to serve OpenAI streams by 1) converting the raw OpenAI stream into a Python generator function and 2) utilizing the built-in StreamingResponseclass to serve SSE with FastAPI.

Converting OpenAI Raw Stream into a Python Generator Function

We first define a generator function, get_openai_generator, which takes a prompt as input and initializes a streaming Chat Completion request using the OpenAI API:

def get_openai_generator(prompt: str):
openai_stream = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=0.0,
stream=True,
)

for event in openai_stream:
if "content" in event["choices"][0].delta:
current_response = event["choices"][0].delta.content
yield current_response

The generator function iterates through each element of the OpenAI stream response. It first retrieves the finish_reason field and checks for the "content" key in the delta object. If present, it extracts the content and uses yield to return the content as a generator.

This generator function can be tested in a Jupyter Notebook by running:

openai_generator = get_openai_generator("what's the answer to 1+1 = 2? Do it step by step")
for chunk in openai_generator:
print(chunk, end='', flush=True)

The output will display the procedural generation of the response.

stream output in Jupyter Notebook

Serving the OpenAI Generator as an API Route

Now we proceed to serve the generator function within FastAPI by returning the generator as a StreamingResponse.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import openai
import time

app = FastAPI()

def get_openai_generator(prompt: str):
openai_stream = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=0.0,
stream=True,
)
for event in openai_stream:
if "content" in event["choices"][0].delta:
current_response = event["choices"][0].delta.content
yield "data: " + current_response + "\n\n"

@app.get('/stream')
async def stream():
return StreamingResponse(get_openai_generator(prompt), media_type='text/event-stream')

In the stream() handler for the/streamendpoint, we return the get_openai_generator function wrapped in a StreamingResponse. The content type for this response is set to text/event-stream, which is suitable for SSE.

⚠️ Important: Note that unlike the Jupyter Notebook example, here the generator function formatted the current_response with "data:" + current_response + "\n\n". This step is important, as frontend SSE libraries will specifically look for this JSON pattern. Without the formatting, frontend will ignore all streamed messages.

To test the /stream endpoint, we can use the following code snippet using requsts:

import requests

url = "http://localhost:8000/stream/"

with requests.get(url, stream=True) as r:
for chunk in r.iter_content(None, decode_unicode=True):
if chunk:
print(chunk, end='', flush=True)

With these steps, you now have a running FastAPI backend that serves OpenAI streams.

Step 3: Consume and render SSE in React.js

In this step, we’ll explore two ways to consume server-sent events (SSE) in a React.js application.

Option 1: EventSource API (MDN docs)

Pros: Supported by most browsers.

Cons: The EventSource API has many limitations, as it can only use the GET method, cannot add request body, cannot add request headers for authorization, etc. If you are relying on authorization header, you may find it hard to adapt to EventSource API, according to this Github thread. If that’s you, skip to Option 2 for more flexibility.

Implementation

reference: Implementing Server-Sent Events in React.js — Medium

First, initialize the EventSource in your component. You'll need the backend endpoint URL to be used. The withCredentials parameter allows the EventSource to carry cookies and user session information when making a request:

const sse = new EventSource('[SSE_URL]', { withCredentials: true });

Use the onmessage handler to read the data streams from the backend. This handler also validates and parses the incoming data:

function getRealtimeData(data) {
// Process the data here
// Pass it to state to be rendered
}

sse.onmessage = e => getRealtimeData(JSON.parse(e.data));

Below is the complete code:

useEffect(() => {
const sse = new EventSource('[YOUR_SSE_ENDPOINT_URL]', { withCredentials: true });

function getRealtimeData(data) {
// Process the data here
// Then pass it to state to be rendered
}

sse.onmessage = e => getRealtimeData(JSON.parse(e.data));

sse.onerror = () => {
// Error log here

sse.close();
}

return () => {
sse.close();
};
}, [YOUR_DEPENDENCIES_HERE]);

Option 2: @microsoft/fetch-event-source

This package provides a higher-level abstraction for working with SSE in React.js applications. It allows you to use POST method, set request headers, request body, etc. You can check out its full capability in its NPM docs.

Implementation

reference: Using fetch-event-source with Server-Sent Events and React — LogRocket Blog

The syntax is quite similar to the Fetch API, with additional onopen, onmessage, and onclose handlers:

import fetchEventSource from '@microsoft/fetch-event-source';

useEffect(() => {
const fetchData = async () => {
await fetchEventSource(`${serverBaseURL}/sse`, {
method: "POST",
headers: { Accept: "text/event-stream" },
onopen(res) {
if (res.ok && res.status === 200) {
console.log("Connection made ", res);
} else if (res.status >= 400 && res.status < 500 && res.status !== 429) {
console.log("Client-side error ", res);
}
},
onmessage(event) {
console.log(event.data);
const parsedData = JSON.parse(event.data);
setData((data) => [...data, parsedData]); // Important to set the data this way, otherwise old data may be overwritten if the stream is too fast
},
onclose() {
console.log("Connection closed by the server");
},
onerror(err) {
console.log("There was an error from server", err);
},
});
};
fetchData();
}, []);

Toubleshooting

onopen and onclose were triggered, but onmessage was never reached? Make sure your stream API’s format is correct. For FastAPI, the generator function should format OpenAI current_response with "data:" + current_response + "\n\n". See the following code snippet:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import openai
import time

app = FastAPI()

def get_openai_generator(prompt: str):
openai_stream = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=0.0,
stream=True,
)
for event in openai_stream:
if "content" in event["choices"][0].delta:
current_response = event["choices"][0].delta.content
# important format
yield "data: " + current_response + "\n\n"

@app.get('/stream')
async def stream():
return StreamingResponse(get_openai_generator(prompt), media_type='text/event-stream')

Hopefully by now you can get the OpenAI stream rendered on your screen! Shoot me a comment if you encountered any issues. Happy coding 🎉

--

--

Huan Xu

MSCS@GaTech. Interested in accessible ML inference. Building www.baynana.co, an AI-powered resume supercharger.