Langchain/Openai Streaming 101 in Python

Problem

I wasn’t satisfied with the long latency in text output for my project, I ask AI to write like waitbutwhy. Hence, I decided to learn how to stream texts and wrap them in a streamable response in an API in the backend.

Background Knowledge: Event-Driven API

To understand the code related to Langcahin and Openai on streaming, we need to understand some basic and important concepts of Event-Driven API.

Traditional HTTP requests

The client (e.g. your front-end browser) will request a WebAPI for some information. For instance, some data in the MongoDB database. The WebAPI will collect the data and send it back to the client to use.

If you want to do streaming, it is actually very problematic, because the client will need to keep sending requests, which is very inefficient.

Hence, there are 3 types of event-driven API to resolve this problem, Webhooks, Websockets, and HTTP Streaming.

Webhooks: a phone number between two applications

The first solution is a webhook. You can imagine it as a phone call between two applications.

  • Application A wants to get notified whenever a certain event happens in Application B. Instead of Application A constantly checking for updates or polling Application B, it sets up a webhook.
  • Application A provides a URL (usually called a webhook URL) to Application B. This URL acts as a phone number that Application B can use to reach Application A.
  • When the desired event occurs in Application B, it makes an HTTP request to the webhook URL provided by Application A. It’s like Application B making a phone call to Application A and saying, “Hey, this event just happened!”
  • Application A receives the HTTP request from Application B, and it can then process the information or perform any required actions based on the event that occurred in Application B. It’s like Application A answering the phone call and listening to the information provided by Application B.

For instance, if you want to handle email bounce back, webhook can be a good solution for you. Because you don’t want to keep sending requests to check if every single email has a bounce back, and the amount of the email bouncing back might be relatively so much smaller than the number of emails you send out.

The webhook system in this case works as the following: When a bounceback notification is received at your webhook endpoint, extract details such as the recipient's email address, bounceback reason, error codes, and any other relevant information. You can then take appropriate actions based on this information, such as updating your email list, marking the recipient as invalid, or analyzing the cause of the bounce. The use case for webhook is to keep the client to up to date with certain events.

Websockets: A hotline in Coldwar

Websockets are a communication protocol that allows for real-time, bidirectional communication between a client (typically a web browser) and a server. In simple terms, you can think of WebSockets as a continuous and interactive conversation between the client and the server. From the figure below, you can see that the client and the server have a handshake agreement to set up bidirectional communication between each other, like a hotline in Cold War.

Here’s how Websockets work:

  1. Establishing a connection: Initially, the client sends a special HTTP request to the server to initiate a WebSocket connection. If the server supports Websockets, it responds with a confirmation to upgrade the connection to a WebSocket connection.
  2. Open connection: Once the connection is established, the client and the server can send messages to each other in a persistent manner. This means that they can continuously exchange data without having to initiate a new connection for every message.
  3. Real-time bidirectional communication: Both the client and the server can send messages to each other at any time. It’s like having a telephone line where both parties can talk and listen simultaneously. This bidirectional communication allows for real-time updates, instant messaging, live notifications, and other interactive features.
  4. Efficient and lightweight: Unlike traditional HTTP requests that require the client to send a request and the server to send a response for every interaction, WebSockets provide a persistent connection that eliminates the need for repetitive handshakes. This makes WebSockets efficient and reduces unnecessary overhead.
  5. Graceful closure: When either the client or the server decides to close the WebSocket connection, they can send a closing message to the other party. This allows for a graceful closure of the connection, giving both sides the opportunity to perform necessary cleanup tasks before ending the conversation.

Websockets are widely used for building real-time applications such as chat systems, collaborative document editing, live streaming, online gaming, and more. They provide a seamless and efficient way for the client and the server to communicate and exchange data in a continuous and interactive manner. If you look at the code from Langchain, they are using WebSockets for their streaming responses.

These are the pros and cons of WebSockets. Hence, the use case for WebSockets is that if you need two ways communication is good and low latency.

HTTP Streaming: Single-sided love from an admirer

HTTP streaming is a technique that allows a server to send data to a client continuously, in a streaming fashion, over a single HTTP connection. In simple terms, you can think of HTTP streaming as watching a video or listening to music in real time, where the content is delivered to you as it becomes available. One request from the client. The server responds, but the response is indefinite. You can imagine it as a single-sided love from your admirer. You don’t reciprocate, but they have an infinite amount of love for you.

There are two ways on how it does it.

  1. Chunk (mostly for backend): Whenever there is new information comes in such as one token (aka word), the server sends it to the client as a chunk.

2. Server-sent events (for browsers e.g. Twitter uses it)

Here’s how HTTP streaming works:

  1. A client sends a request: The client (typically a web browser) sends an HTTP request to the server, indicating its interest in receiving a continuous stream of data.
  2. The server starts sending data: Upon receiving the client’s request, the server starts sending the data back in chunks or as a continuous stream. It doesn’t wait for the entire response to be generated before sending it.
  3. The client receives and processes data: As the server sends the data in chunks, the client can start receiving and processing the received data immediately. This enables real-time updates and allows the client to display or act upon the received information as it arrives.
  4. The connection remains open: Unlike traditional HTTP requests where the connection is closed after the server sends the response, in HTTP streaming, the connection remains open. This allows the server to continue sending data to the client over the same connection.
  5. Continuous data delivery: The server keeps sending data to the client as it becomes available or as per a defined interval. This enables a continuous flow of data from the server to the client, similar to a stream of water flowing steadily.

HTTP streaming is commonly used in applications that require real-time updates, such as live sports scores, stock market tickers, social media feeds, or chat applications. It allows for immediate and continuous delivery of data, ensuring that the client receives the most up-to-date information without the need for frequent requests and responses.

It’s important to note that HTTP streaming requires support from both the server and the client to establish and maintain the streaming connection. The server must be capable of sending data in a streaming fashion, and the client must be able to handle and process the streaming data as it arrives.

This is the pros and cons for HTTP Streaming. In short, the use case for it is single sending for a long-lived connection.

To conclude, Webhooks are useful for keeping the client up to date with certain events. WebSockets is for the case when you need two ways communication is good and low latency. HTTP Streaming is for single sending for a long-lived connection.

Streaming in Openai ChatGPT and Langchain in Python

After you understand the basics of Event-driven API, understanding the code and performing a streaming response is much easier.

Streaming in ChatGPT

Since I deployed my LLM model on Modal, I investigated their method of streaming through Openai GPT API. This is their sample code (the repo).

def stream_chat(prompt: str):
import openai

for chunk in openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
stream=True,
):
content = chunk["choices"][0].get("delta", {}).get("content")
if content is not None:
yield content

If you print out the chunk after setting stream= True, you will see the sample chunk look like this.

chunk {
"id": "chatcmpl-7Z4pA8ROGam8fXAoJQv0putr9tBut",
"object": "chat.completion.chunk",
"created": 1688594380,
"model": "gpt-3.5-turbo-0613",
"choices": [
{
"index": 0,
"delta": {
"content": " SQL"
},
"finish_reason": null
}
]
}

You can see that lots of chunks are being streamed through the creation, and you can find the token by selecting a choice, delta, and content. When you get the content through yield, you can wrap it as a fastapi Streamable response.

@stub.function()
@web_endpoint()
def web(prompt: str):
from fastapi.responses import StreamingResponse

return StreamingResponse(stream_chat(prompt), media_type="text/html")

Additional side knowledge: Why do you use yield?

Normally, when you use the machine, it gives you the whole list at once. But with the “yield” keyword, you can use the machine in a special way. Instead of getting the whole list at once, the machine gives you one thing at a time. Each time you ask for the next thing, the machine creates it and gives it to you. But here’s the interesting part: the machine remembers where it stopped and can continue making more things whenever you ask for them.

So, let’s say you have a machine that makes numbers. When you use the machine, it doesn’t give you all the numbers from 1 to 10 right away. Instead, when you ask for the next number, it creates it and gives it to you. And when you ask again, it creates the next number and gives it to you, and so on.

The “yield” keyword is like a special instruction you give to the machine, telling it to give you the next thing and remember where it stopped. It’s like saying, “Okay, machine, give me the next number, and when I ask again, continue where you left off and give me the next number after that.”

This way, you can use the machine to get numbers one at a time without needing to store all the numbers in your memory. It’s useful when you have a really long list or when you only need some of the numbers and don’t want to waste time and memory generating all of them.

In short, yield returns a generator object that can be used to iterate over a sequence of values.

If you test what is the output after setting stream = True it in Openai, you will also see it’s a generator object.

response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
stream=True)
print("response ", response)

Output

response <generator object EngineAPIResource.create.<locals>.<genexpr> at 0x7f8ad8605cf0>

Langchain callback- Websocket

If you look at the source code from Langchain, you will see that they use Websocket to implement the streaming in their callback.

"""Callback handlers used in the app."""
from typing import Any, Dict, List
from langchain.callbacks.base import AsyncCallbackHandler
from schemas import ChatResponse

class StreamingLLMCallbackHandler(AsyncCallbackHandler):
"""Callback handler for streaming LLM responses."""
def __init__(self, websocket):
self.websocket = websocket
async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
resp = ChatResponse(sender="bot", message=token, type="stream")
await self.websocket.send_json(resp.dict())

class QuestionGenCallbackHandler(AsyncCallbackHandler):
"""Callback handler for question generation."""
def __init__(self, websocket):
self.websocket = websocket
async def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> None:
"""Run when LLM starts running."""
resp = ChatResponse(
sender="bot", message="Synthesizing question...", type="info"
)
await self.websocket.send_json(resp.dict())

Note: Callback handlers, also known as callback functions or callbacks, are functions that are passed as arguments to other functions or methods. These callbacks are then invoked or called by the receiving function or method to perform specific actions or handle events at the appropriate time.

To implement it, you also need to set stream = True. This is the sample code Langchain provides (link).

from langchain.chat_models import ChatOpenAI
from langchain.schema import (
HumanMessage,
)

from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
chat = ChatOpenAI(streaming=True, callbacks=[StreamingStdOutCallbackHandler()], temperature=0)
resp = chat([HumanMessage(content="Write me a song about sparkling water.")])

If you set stream to True when making a request, Requests cannot release the connection back to the pool unless you consume all the data or call Response.close.

In addition, there is a fastapi-async-langchainlibrary you can use (link) to stream over HTTP and WebSocket.

In the end, I decided to use Streaming in ChatGPT and streamed out the response!

Note: If you run it on your terminal, you might want to check that your client (your terminal) isn’t blocking your server (My server is modal). For instance, I add no buffer for my testing: curl — no-buffer — get — data-urlencode “user_input={your_input}” "{your_web_api}"

You can also view my Github code for the deployment: here.

--

--