Chain reaction: How to create an observable frontend workflow using LlamaIndex and Chainlit

Tituslhy
MITB For All
Published in
8 min read4 days ago

On LlamaIndex

LlamaIndex is an open-source Python library that provides a modular and extensible framework for building and orchestrating Large Language Models (LLMs) and other AI components. At its core, LlamaIndex aims to make it easier to develop and deploy sophisticated AI applications.

On LlamaIndex’s Workflow Abstraction:

LlamaIndex recently launched its “Workflow” abstraction earlier this month (blog post here). Workflows leverage on the “event-driven” paradigm to develop flexible, customized LLM systems.

In a nutshell, a workflow comprises many steps that take in events and create events. A user asking a question triggers a StartEvent in the Workflow, and the Workflow object triggers all steps that take in the StartEvent as an input, creating other events which then trigger other steps taking these respective events. The Workflow only terminates when a StopEvent is triggered.

Workflows are also asynchronous! So you can quickly ship from notebook to production.

On Chainlit applications:

Chainlit is a fantastic library for creating ChatGPT-like LLM app interfaces. It has Python and React modules so it’s good to be used by data scientists and full-stack developers to create beautiful applications. Like LlamaIndex, the code mileage of this library is amazing. In Python, all you need to do is code 2 decorated functions: @cl.on_chat_start and @cl.on_message to tell Chainlit how to handle app start and message events. It also has this abstraction known as a “chain of thought”, where functions decorated with @cl.step are displayed as a “chain of thought” object.

So what happens when we add this decorator on top of each Workflow step? Let me show you what I mean. In this example, we’ll build a simple hybrid LLM agent system using LlamaIndex’s Workflow abstraction and Chainlit.

The Simple Hybrid LLM App

For every question, we’ll ask the LLM to use Tavily AI’s search tool (another brilliant tool btw) to answer the question with an Internet search and get the LLM to answer the user’s question without searching. We then consolidate our responses by getting the LLM to decide which answer better answers the question — if it’s both, it’ll summarize both answers, otherwise, it’ll just return the better answer.

Let’s start by importing the modules we need and setting everything up. You will need a TavilyAPI key and OpenAI API key stored in a .env to follow along.

import chainlit as cl

import os

from llama_index.core.agent import FunctionCallingAgentWorker
from llama_index.core.base.llms.types import ChatMessage, MessageRole
from llama_index.core.chat_engine import SimpleChatEngine
from llama_index.core.llms import LLM
from llama_index.core.workflow import (
Workflow,
Context,
Event,
StartEvent,
StopEvent,
step
)
from llama_index.llms.openai import OpenAI

## Utility function to draw out the workflow
# from llama_index.utils.workflow import (
# draw_all_possible_flows
# )
from llama_index.tools.tavily_research import TavilyToolSpec

from typing import Optional, Annotated, List, Any

llm = OpenAI(model="gpt-4o-mini")

Chainlit automatically loads environment variables stored in the .env file on launch so there’s no need to explicitly load it.

Now let’s define the Tavily tool for our LLM to use!

### Define tools
search_tool_spec = TavilyToolSpec(api_key=os.getenv("TAVILY"))
search_tools = search_tool_spec.to_tool_list()

That was fast. You can thank LlamaIndex for that ❤

Now let’s define the events that are salient in our workflow

class SearchEvent(Event):
"""Requires the LLM to do an online search to answer the question"""
query: Annotated[str, "The user's query"]

class AnswerEvent(Event):
"""Allows the LLM to answer the question without searching"""
query: Annotated[str, "The user's query"]

class ResponseEvent(Event):
"""Collects LLM response"""
query: Annotated[str, "The user's query"]
answer: Annotated[str, "The LLM's response"]

Hold on to your questions — I’ll answer them once I define the workflow

class MixtureOfAnswers(Workflow):
def __init__(
self,
*args: Any,
llm: Optional[LLM] = llm,
**kwargs: Any
):
"""Class constructor. Takes in an llm instance and constructs
1. A function calling agent with search tools
2. A simple chat engine instance
3. A common memory instance across the workflow

Args:
llm (Optional[LLM], optional): LLM instance. Defaults to Settings.llm.
"""
super().__init__(*args, **kwargs)
self.llm = llm
self.search_agent_worker = FunctionCallingAgentWorker.from_tools(
tools = search_tools,
llm = self.llm
)
self.search_agent = self.search_agent_worker.as_agent()
self.answer_without_search_engine = SimpleChatEngine.from_defaults(
llm = self.llm
)
self.history: List[ChatMessage] = []

@cl.step(type="llm")
@step()
async def route_to_llm(
self,
ev: StartEvent
) -> SearchEvent | AnswerEvent:
"""Generates a search event and an answer event once given a start event"""

## Update memory
self.history.append(
ChatMessage(
role = MessageRole.USER,
content = ev.query
)
)

## Routes to both events. But you can also write a router component to decide
## which event to route to.
self.send_event(SearchEvent(query = ev.query))
self.send_event(AnswerEvent(query = ev.query))

@cl.step(type="tool")
@step()
async def search_and_answer(
self,
ev: SearchEvent
) -> ResponseEvent:
"""Uses the tavily search tool to answer the question"""

## Synthesize response
response = await self.search_agent.achat(
ev.query,
chat_history = self.history
)

## [OPTIONAL] Show intermediate response in the frontend
# await cl.Message(content="ANSWER WITH SEARCH: " + str(response)).send()

## Update memory
self.history.append(
ChatMessage(
role = MessageRole.ASSISTANT,
content = "ANSWER WITH SEARCH: " + str(response)
)
)

return ResponseEvent(query = ev.query, answer = str(response))

@cl.step(type="llm")
@step()
async def simply_answer(
self,
ev: AnswerEvent
) -> ResponseEvent:
"""Uses the LLM to simple answer the question"""

## Synthesize response
response = await self.answer_without_search_engine.achat(
ev.query,
chat_history = self.history
)

## [OPTIONAL] Show intermediate response in the frontend
# await cl.Message(content="ANSWER WITHOUT SEARCH: " + str(response)).send()

## Update memory
self.history.append(
ChatMessage(
role = MessageRole.ASSISTANT,
content = "ANSWER WITHOUT SEARCH: " + str(response)
)
)

return ResponseEvent(query = ev.query, answer = str(response))

@cl.step(type="llm")
@step()
async def compile(
self,
ctx: Context,
ev: ResponseEvent
) -> StopEvent:
"""Compiles and summarizes answers from all response events"""

## There are 2 response events from routing to 2 different agents. This can
## also be a dynamic number of events.
ready = ctx.collect_events(ev, [ResponseEvent] * 2)

if ready is None:
return None

response = await self.llm.acomplete(
f"""
A user has asked us a question and we have responded accordingly using a
search tool and without using a search tool. Your job is to decide which
response best answered the question and summarize the response into a crisp
reply. If both responses answered the question, summarize both responses
into a single answer.

The user's query was: {ev.query}

The responses are:
{ready[0].answer} &
{ready[1].answer}
"""
)

## Update memory
self.history.append(
ChatMessage(
role = MessageRole.ASSISTANT,
content = "FINAL ANSWER: " + str(response)
)
)

return StopEvent(result = str(response))

LlamaIndex’s Workflow objects don’t really need a class constructor, but we can still use one to store class specific objects and their states. We initialize the search agent, the answer without searching engine (using LlamaIndex’s SimpleChatEngine abstraction) and the Workflow’s main LLM.

Each step in the Workflow are registered using LlamaIndex’s @step decorator, and then further registered as a Chainlit chain of thought object using the @cl.step decorator. How easy is that!

We also instantiate a chat history object and synchronize it throughout the ongoing conversation. That way your LLM can respond with fuller context which includes the responses by other LLMs/agents.

Do remember to awaitin front of every asynchronous call, and add the ain front of each LlamaIndex LLM invoke function ( achat instead of chatfor example) for asynchronous invoking. Otherwise your app just hangs and you’ll wonder what you did wrong.

Do also note that LlamaIndex’s Workflow object does type checking — so type annotations are important. They’re a best practice anyway.

When all is said and done, here’s what our Workflow looks like:

Just use LlamaIndex’s `draw_all_possible_flows` function to do this

Now let’s elaborate on the Workflow’s steps!

When the user asks a question, the Workflow registers a StartEvent and looks amongst its steps for step/s that consume this event which in this case is the route_to_llm step.

This step then generates a SearchEvent and an AnswerEvent, which then triggers the search_and_answer and simply_answer steps — which then invoke the agent and the chat engine to answer the question. These steps then return an ResponseEvent which triggers the final compile step to consolidate the replies.

Did I answer your questions?

Perhaps all except for one (or more — depending on which LLM library you like to use):

  1. Why so many “events” instead of just one “State” like Langgraph?
  2. Why not just use a “manager_llm” to decide routing like crewAI and Autogen?
  3. Where’s the Chainlit app?

To questions #1 and #2, there are many different state-of-the-art LLM orchestration libraries — each with its own strengths and weaknesses. Coding out multiple different events confers more flexibility and allows for a more “declarative” way of coding. Plus we also know that using a manager_llm is sometimes a wild card — they can be quite frustrating when routing to the wrong agents. But that isn’t to say that LlamaIndex is the best — though it is my favorite.

Oh right the Chainlit app.

The final piece of code is:

### Define the app - with just a few lines of code
@cl.on_chat_start
async def on_chat_start():
app = MixtureOfAnswers(
verbose = True,
timeout = 6000
) #The app times out if it runs for 6000s without any result
cl.user_session.set("app", app)
await cl.Message("Hello! Ask me anything!").send()

@cl.on_message
async def on_message(message: cl.Message):
app = cl.user_session.get("app")
result = await app.run(query = message.content)
await cl.Message(content = result).send()

Yup it really is that easy. You can thank Chainlit for that ❤

Worked like a charm

Each step in the workflow is registered as a chainlit step in its ‘chain of thought’ abstraction. If you want, you can even send out the responses of the LLM in the intermediate steps in the frontend of the app — this creates a “group chat” experience. I’ve included the code for that in the main body as well (commented out).

At point of writing I’ve raised a pull request with my codes in Chainlit’s Cookbook GitHub repository. Hopefully they’ll accept it soon!

Conclusion

In conclusion, this post demonstrated how to create a simple hybrid LLM agent system using LlamaIndex’s Workflow abstraction and Chainlit. By combining the strengths of both libraries, we can build powerful and flexible AI applications. The example showcased here highlights the ease of integrating LlamaIndex’s event-driven workflows with Chainlit’s intuitive interface and “chain of thought” abstraction.

With this approach, developers can quickly prototype and deploy AI systems that leverage multiple LLMs and tools. As the field of AI continues to evolve, the ability to orchestrate and combine different models and libraries will become increasingly important. By exploring the possibilities of LlamaIndex and Chainlit, we can unlock new potential for building innovative and effective AI solutions.

Disclaimer: All opinions and interpretations are that of the writer, and not of MITB. I declare that I have full rights to use the contents published here, and nothing is plagiarized. I declare that this article is written by me and not with any generative AI tool such as ChatGPT. I declare that no data privacy policy is breached, and that any data associated with the contents here are obtained legitimately to the best of my knowledge. I agree not to make any changes without first seeking the editors’ approval. Any violations may lead to this article being retracted from the publication.

--

--