Featured
Introducing Graphite — An Event Driven AI Agent Framework
Graphite is an open-source framework for building domain-specific AI assistants using composable agentic workflows. It offers a highly extensible platform that can be tailored to unique business requirements, enabling organizations to develop custom workflows suited to their specific domains.
We’ve mentioned this product in our previous posts, and we’re proud to share that we have now open sourced Graphite on GitHub! Check it out and give us a Star ⭐ there if you like it!
You might be wondering, “With AI solutions like ChatGPT, Claude, and various agentic platforms and frameworks already available, why create another one” The short answer: we identified a gap in solving real-world problems with AI tools. Many generic agents — like typical ReAct or CoT agents — struggle with mission-critical tasks where mistakes can be costly.
Graphite provides a straightforward and controllable framework for building workflows, allowing large language models (LLMs) not just to reason and plan, but to operate within a defined, auditable, and easily restorable workflow. It also includes essential features such as observability, idempotency, and auditability out of the box.
In this post, we’ll introduce Graphite briefly, from its architecture to its key features, and provide a simple example: using Graphite to build a Know-Your-Client AI assistant.
The Architecture: Simple, Powerful, and Composable
Graphite is structured into three conceptual layers — Assistants, Nodes, and Tools:
- Assistants orchestrate the workflow and manage the conversation state.
- Nodes encapsulate discrete logic, each dedicated to a specific function (e.g., an LLM call or function execution).
- Tools are pure functions responsible for executing tasks, such as calling an API or running a Python function.
Additionally, Graphite employs event sourcing pattern to record every state change. Whenever an Assistant, Node, or Tool is invoked, responds, or fails, a corresponding event is generated and stored in the event store.
The command pattern cleanly separates request initiators from executors through well-defined Command objects and handlers. These Commands carry all the necessary context, enabling nodes to invoke tools in a self-contained and straightforward manner.
The nodes coordinate through a lightweight, Pub/Sub workflow orchestration mechanism.
- Workflow orchestrates interactions among nodes using a Pub/Sub pattern with in-memory message queuing.
This architecture unlocks plug-and-play modularity — you can build workflows like Lego bricks, add or remove nodes, and isolate failures with surgical precision.
The Features — What Makes Graphite Shine
The core design principles we’ve introduced above shows that set Graphite apart from other agent frameworks are:
- A Simple 3-Layer Execution Model
Three distinct layers — assistant, node, and tool — manage execution, while a dedicated workflow layer oversees orchestration. - Pub/Sub Event-Driven Orchestration
Communication relies on publishing and subscribing to events, ensuring a decoupled, modular flow of data throughout the system. - Events as the Single Source of Truth
All operational states and transitions are recorded as events, providing a uniform way to track and replay system behavior if needed.
Combining these elements, Graphite provides a production-grade AI application framework capable of operating reliably at scale, handling failures gracefully, and maintaining user and stakeholder trust. Four essential capabilities form the backbone of this approach:
- Observability
Complex AI solutions involve multiple steps, data sources, and models. Graphite’s event-driven architecture, logging, and tracing make it possible to pinpoint bottlenecks or errors in real time, ensuring that each component’s behavior is transparent and measurable. - Idempotency
Asynchronous workflows often require retries when partial failures occur or network conditions fluctuate. Graphite’s design emphasizes idempotent operations, preventing pub/sub data duplication or corruption when calls must be repeated. - Auditability
By treating events as the single source of truth, Graphite automatically logs every state change and decision path. This level of detailed record-keeping is indispensable for users working in regulated sectors or who need full traceability for debugging and compliance. - Restorability
Long-running AI tasks risk substantial rework if they fail mid-execution. In Graphite, checkpoints and event-based playback enable workflows to resume from the precise point of interruption, minimizing downtime and maximizing resource efficiency.
Together, these capabilities — observability, idempotency, auditability, and restorability — distinguish Graphite as a framework for building robust and trustworthy AI applications. Below is a detailed breakdown of how Graphite implements each feature.
Show Your Code — KYC Assistant
When designing an AI-based workflow, keep in mind that large language models introduce uncertainty. It’s helpful to follow Occam’s razor principle, which means the simpler the workflow, the better.
Design the workflow
Suppose you want to create a “Know Your Client (KYC)” assistant for a gym registration process, with human-in-the-loop (HITL) functionality. The user must provide their full name and email address to finish the registration workflow. And if anything is missing, the workflow pauses and asks client for more information. Clearly the real world problem would need more information, but here we just simplify it for this demo.
Build the assistant
Firstly, install Graphite from pip
pip install grafi
From above workflow graph, we will need add following components:
7 topics:
- agent input topic (framework provided)
from grafi.common.topics.topic import agent_input_topic
- user info extract topic
user_info_extract_topic = Topic(name="user_info_extract_topic")
- human call topic
hitl_call_topic = Topic(
name="hitl_call_topic",
condition=lambda msgs: msgs[-1].tool_calls[0].function.name
!= "register_client",
)
- human in the loop topic (framework provided)
from grafi.common.topics.human_request_topic import human_request_topic
- register user topic topic
register_user_topic = Topic(
name="register_user_topic",
condition=lambda msgs: msgs[-1].tool_calls[0].function.name
== "register_client",
)
- register user respond topic
register_user_respond_topic = Topic(name="register_user_respond")
- agent output topic (framework provided)
from grafi.common.topics.output_topic import agent_output_topic
5 nodes:
- LLM Node [User info extract node] to extract name and email from user input
user_info_extract_node = (
LLMNode.Builder()
.name("ThoughtNode")
.subscribe(
SubscriptionBuilder()
.subscribed_to(agent_input_topic)
.or_()
.subscribed_to(human_request_topic)
.build()
)
.command(
LLMResponseCommand.Builder()
.llm(
OpenAITool.Builder()
.name("ThoughtLLM")
.api_key(self.api_key)
.model(self.model)
.system_message(self.user_info_extract_system_message)
.build()
)
.build()
)
.publish_to(user_info_extract_topic)
.build()
)
- LLM Node [action] to create action given extracted information
action_node = (
LLMNode.Builder()
.name("ActionNode")
.subscribe(user_info_extract_topic)
.command(
LLMResponseCommand.Builder()
.llm(
OpenAITool.Builder()
.name("ActionLLM")
.api_key(self.api_key)
.model(self.model)
.system_message(self.action_llm_system_message)
.build()
)
.build()
)
.publish_to(hitl_call_topic)
.publish_to(register_user_topic)
.build()
)
- Function Tool Node [human-in-the-loop] to ask user for missing information if any
human_request_function_call_node = (
LLMFunctionCallNode.Builder()
.name("HumanRequestNode")
.subscribe(hitl_call_topic)
.command(
FunctionCallingCommand.Builder()
.function_tool(self.hitl_request)
.build()
)
.publish_to(human_request_topic)
.build()
)
- Function Tool Node [register user] to register the client
register_user_node = (
LLMFunctionCallNode.Builder()
.name("FunctionCallRegisterNode")
.subscribe(register_user_topic)
.command(
FunctionCallingCommand.Builder()
.function_tool(self.register_request)
.build()
)
.publish_to(register_user_respond_topic)
.build()
)
- LLM Node [response to user] to draft the final respond to user
user_reply_node = (
LLMNode.Builder()
.name("LLMResponseToUserNode")
.subscribe(register_user_respond_topic)
.command(
LLMResponseCommand.Builder()
.llm(
OpenAITool.Builder()
.name("ResponseToUserLLM")
.api_key(self.api_key)
.model(self.model)
.system_message(self.summary_llm_system_message)
.build()
)
.build()
)
.publish_to(agent_output_topic)
.build()
)
And here is the source code — KYC assistant
Then let’s write a simple code to test it:
import json
import uuid
from kyc_assistant import KycAssistant
from grafi.common.decorators.llm_function import llm_function
from grafi.common.models.execution_context import ExecutionContext
from grafi.common.models.message import Message
from grafi.tools.functions.function_tool import FunctionTool
class ClientInfo(FunctionTool):
@llm_function
def request_client_information(self, question_description: str):
"""
Requests client input for personal information based on a given question description.
"""
return json.dumps({"question_description": question_description})
class RegisterClient(FunctionTool):
@llm_function
def register_client(self, name: str, email: str):
"""
Registers a user based on their name and email.
"""
return f"User {name}, email {email} has been registered."
user_info_extract_system_message = """
"You are a strict validator designed to check whether a given input contains a user's full name and email address. Your task is to analyze the input and determine if both a full name (first and last name) and a valid email address are present.
### Validation Criteria:
- **Full Name**: The input should contain at least two words that resemble a first and last name. Ignore common placeholders (e.g., 'John Doe').
- **Email Address**: The input should include a valid email format (e.g., example@domain.com).
- **Case Insensitivity**: Email validation should be case insensitive.
- **Accuracy**: Avoid false positives by ensuring random text, usernames, or partial names don’t trigger validation.
- **Output**: Respond with Valid if both a full name and an email are present, otherwise respond with Invalid. Optionally, provide a reason why the input is invalid.
### Example Responses:
- **Input**: "John Smith, john.smith@email.com" → **Output**: "Valid"
- **Input**: "john.smith@email.com" → **Output**: "Invalid - Full name is missing"
- **Input**: "John" → **Output**: "Invalid - Full name and email are missing"
Strictly follow these validation rules and do not assume missing details."
"""
def get_execution_context():
return ExecutionContext(
conversation_id="conversation_id",
execution_id=uuid.uuid4().hex,
assistant_request_id=uuid.uuid4().hex,
)
def test_kyc_assistant():
execution_context = get_execution_context()
assistant = (
KycAssistant.Builder()
.name("KycAssistant")
.api_key(
"YOUR_OPENAI_API_KEY"
)
.user_info_extract_system_message(user_info_extract_system_message)
.action_llm_system_message(
"Select the most appropriate tool based on the request."
)
.summary_llm_system_message(
"Response to user with result of registering. You must include 'registered' in the response if succeed."
)
.hitl_request(ClientInfo(name="request_human_information"))
.register_request(RegisterClient(name="register_client"))
.build()
)
while True:
# Initial User Input
user_input = input("User: ")
input_data = [Message(role="user", content=user_input)]
output = assistant.execute(execution_context, input_data)
responses = []
for message in output:
try:
content_json = json.loads(message.content)
responses.append(content_json["question_description"])
except json.JSONDecodeError:
responses.append(message.content)
respond_to_user = " and ".join(responses)
print("Assistant:", respond_to_user)
if "registered" in output[0].content:
break
if __name__ == "__main__":
test_kyc_assistant()
Test, Observe, Debug, and Improve
Graphite uses OpenTelemetry and Arize’s OpenInference to trace and observe the assistant behaviour.
To capture the traces locally, we can start phoenix locally.
phoenix serve
Here is an example of the conversation between user and assistant
> User: Hi, I'd like to sign up for your gym. Could you help me with the process?
> Assistant: Please provide your full name and email address to sign up for the gym.
> User: my name is Craig Li, and here is my email: craig@example.com
> Assistant: Congratulations, Craig! You are now registered at our gym. If you have any questions or need assistance, feel free to ask!
Looks good. But if you push the boundary of the tests, you will hit the error real quick. Like this
> User: Hi, how are you today?
An error occurs, and when we debug with Phoenix, we can pinpoint its exact location:
The issue originates from the Action LLM. Given the input, it should select the appropriate tool, but instead, it generates a plain string response.
To fix this, we can provide clear instructions: if the user asks a question unrelated to gym registration, the LLM should use the request_client_information tool to respond politely and ask if they need assistance with gym registration.
so let’s update action LLM system prompt
.action_llm_system_message(
"Select the most appropriate tool based on the request. if the user asks a question unrelated to gym registration, the LLM should use the `request_client_information` tool to respond politely and ask if they need assistance with gym registration."
)
and try again
> User: Hi, how are you today?
> Assistant: I'm here to assist you with gym registration. Could you please provide your full name and email?
You’ve now built an agent that can assist with gym registrations! You can find the assistant’s code at this link and an example is available here.
Summary
Graphite is an open-source framework for building domain-specific AI assistants through composable, event-driven workflows. With loosely coupled components — assistants, nodes, tools, and workflows — Graphite enables modular design, where events serve as the single source of truth. Its architecture supports complex logic like LLM calls, function execution, and RAG retrieval, all coordinated via dedicated pub/sub topics and commands. Built-in support for observability, idempotency, auditability, and restorability makes it production-ready, allowing workflows to scale, recover, and adapt to compliance needs. Whether you’re building conversational agents or automated pipelines, Graphite offers a flexible, extensible foundation for real-world AI systems.