A2A Deep Dive: Getting Real-Time Updates from AI Agents
An Exploration of A2A Streaming via SSE
What is this about?
Ok then, let’s talk a bit more about Google’s Agent-to-Agent (A2A) protocol 🤗
I recently published a blog post on how to get started with the official A2A demo. In it we explored the capabilities of A2A and how it helps AI agents, potentially built with different frameworks, actually communicate with one another. We learnt about the AgentCard (i.e. the agent’s introduction) and how we can use tasks/send to give an agent a quick job and get a Message or Artifact back pretty quickly. That’s great for straightforward tasks.
But what happens when the work isn’t so quick? We know AI agents can sometimes take their time. Generating a detailed report, sifting through a large dataset, calling out to other services, maybe even waiting for a human approval aren’t typically instantaneous operations. If our client agent just sits there patiently after a tasks/send, our users might think the application has frozen, or vital system resources could be tied up unnecessarily.
This is precisely the problem A2A’s streaming mechanism is designed to solve. In this post, we’ll explore how we can use Server-Sent Events (SSE) combined with the A2A tasks/sendSubscribe method. This allows the agent doing the work (the A2A Server) to proactively send progress updates back to the agent that requested the task (the A2A Client). We’ll examine how this works under the bonnet and discuss the practicalities of building and hosting these streaming agents.
Why should we care?
If we stick with the basic tasks/send for agent tasks that take a bit of time, we run into some real practical headaches. Users don’t appreciate staring at unresponsive interfaces, and it makes our overall system feel sluggish and less intelligent. Without a way to get feedback during the process:
- Users get left in the dark: Is it working? Is it stuck? Did it crash? A waiting spinner tells them nothing useful.
- Things feel sluggish: The calling agent or UI is just frozen, waiting. It can’t do anything else or react to partial results.
- Complex interactions are hard: How do you model a back-and-forth conversation or a multi-stage process if you only get one final answer?
Using A2A’s streaming via tasks/sendSubscribe changes the game. It lets us build applications where:
- We can show real-time status (“Analysing document…”, “Generating draft…”, “Checking sources…”).
- We can stream results back piece by piece, like getting text from an LLM as it’s generated.
- Agents can give intermediate feedback before the whole task is finished.
Essentially, if our agents are doing more than just quick lookups, we really need to understand how tasks/sendSubscribe works to build quality applications.
How A2A Streaming Works
To address this, A2A offers a better approach. For handling these longer-running jobs and providing feedback along the way, we can use the tasks/sendSubscribe method. When our client agent uses this method instead of tasks/send, it’s effectively saying to the server agent: ‘Right, start this task for me, but importantly, keep this communication line open after you’ve received my initial request.’
Keeping this connection alive is the crucial difference. It creates a pathway for the server agent, while it’s busy working away in the background, to proactively push updates back to our waiting client agent whenever there’s something new to share — maybe a status update (“Still working on section 3…”), an intermediate result, or finally the completed task. The underlying web standard A2A uses to send these updates back over the open connection is Server-Sent Events (SSE). Let’s look at how that flow works in practice.
Let’s Trace the Journey:
Client Kicks Off: Before our A2A Client can even think about streaming, it needs to know if the target agent is capable and willing to stream updates. This critical information is advertised in the agent’s AgentCard, which the client typically fetches first (perhaps from the standard /.well-known/agent.json path or an agent registry). Inside the AgentCard JSON, we look specifically at the capabilities object. For instance, an agent designed for potentially longer lookups, like the currency exchange agent in the previous blog post, might explicitly state its support like this:
// Snippet from an AgentCard
"capabilities": {
"streaming": true, // <-- This agent supports SSE streaming!
"pushNotifications": false
// ... other capabilities
},
If our client sees “streaming”: true in the fetched AgentCard, it knows it can proceed with a streaming request. It then makes its HTTP POST request to the agent’s endpoint, but instead of using the tasks/send method, it uses tasks/sendSubscribe. It’s important to remember that the actual task details provided in the params field , the TaskSendParams object containing the initial Message, a unique Task ID, sessionId, etc., are structured exactly the same way whether we’re using tasks/send or tasks/sendSubscribe. The only change is the method name itself, signaling the desire for a streaming connection:
// Client Request Payload (Simplified)
{
"jsonrpc": "2.0",
"method": "tasks/sendSubscribe", // The key change!
"params": { // TaskSendParams structure
"id": "task-podcast-123",
"sessionId": "session-abc",
"message": { /* ... initial message ... */ }
},
"id": "client-req-1" // JSON-RPC request ID
}
Server Accepts, Holds the Line: The A2A Server receives this request. It validates it, acknowledges it (usually with an initial HTTP 200 OK indicating the stream will start), and importantly, it keeps the underlying HTTP connection open. It then needs to start processing the actual task, typically by kicking off some background logic (we don’t want the main request handler to block!).
Server Sends Updates (SSE Events): Now, as the background task makes progress, the server sends updates back over that open connection. These updates are formatted as SSE message events. Each event’s data field contains a string which is a complete JSON-RPC response. Inside that JSON response’s result field, we find the specific A2A payload:
- TaskStatusUpdateEvent: This is our main tool for progress reporting. It tells the client the current Task.id, the updated TaskStatus (including the state like working, and often a Message with a TextPart describing the current step), and a crucial boolean flag final. While the task is ongoing, final is false.
// Example: Progress update
event: message
data: {
"jsonrpc": "2.0",
"id": "client-req-1",
"result": {
"id": "task-podcast-123",
"status": {
"state": "working",
"message": {
"role": "agent",
"parts": [{"type": "text", "text": "Drafting introduction..."}]
}
},
"final": false
}
}
- TaskArtifactUpdateEvent: If the result itself is generated piece by piece (like streaming text from an LLM, or sending a large file in chunks), the server can send these chunks using TaskArtifactUpdateEvent. The Artifact object within this event uses flags like append: true (meaning “add this to the previous artifact chunk with the same index”) and lastChunk: true/false to help the client reconstruct the full artifact correctly.
Server Signals Completion: Once the background task finishes its work (either successfully or encountering an error), the server sends one final TaskStatusUpdateEvent. This event includes the terminal state (completed or failed) and, critically, sets the final: true flag. This tells the client “That’s all folks, no more updates for this task on this connection.”
// Example: Task completed successfully
event: message
data: {
"jsonrpc": "2.0",
"id": "client-req-1",
"result": {
"id": "task-podcast-123",
"status": {
"state": "completed",
"message": {
"role": "agent",
"parts": [{"type": "text", "text": "Final script generated."}]
}
},
"final": true
}
}
Client Listens and Reacts: All this time, the client is listening on the open connection for these SSE message events. When one arrives, it needs to:
- Parse the data field as JSON to get the A2A JSON-RPC response.
- Check for errors in the response.
- Examine the result field to identify the event type (TaskStatusUpdateEvent or TaskArtifactUpdateEvent).
- Update its internal state or UI based on the received status or artifact data.
- Recognise the final: true flag and close the connection cleanly.
Implementation Considerations
This streaming model requires some careful thought when we build the client and server.
Server-Side Considerations
- Async Architecture: The web framework handling the A2A endpoint must be asynchronous (like FastAPI or Starlette, which the common.server.A2AServer uses). The core agent logic that performs the actual task needs to run without blocking the main server process handling the connection — think asyncio.create_task in Python or similar background execution models.
- Linking Background Task to Connection: We need a communication bridge. How does the background task, busy generating the report, send updates back to the specific code managing the open SSE connection for that particular client? A common pattern is to use an asyncio.Queue created for each unique tasks/sendSubscribe request. The background task puts updates onto this queue.
- SSE Generation: The endpoint handler typically returns an async generator. This special function loops, waits for items to appear on that internal queue, formats them into the correct SSE data: string (containing the A2A JSON-RPC response), and then yields them. Web frameworks like Starlette have helpers (EventSourceResponse) that work well with such generators.
- Handling Disconnections: Clients might disconnect unexpectedly. The server code, particularly the async generator yielding SSE events, needs to handle this gracefully (e.g., catching asyncio.CancelledError) to clean up resources and potentially signal the background task to stop, if possible.
# Conceptual Python Server Snippet using A2A common lib structure
# (Illustrates the pattern within an A2A Server/TaskManager - more explanation added)
# python
import asyncio
import json
import logging
from typing import Callable, Any # For type hinting the callback
from sse_starlette.sse import EventSourceResponse
from common.types import (
TaskStatusUpdateEvent, TaskState, TaskStatus, Message, TextPart,
SendTaskStreamingResponse, InternalError, SendTaskStreamingRequest,
TaskArtifactUpdateEvent # Import added
)
# Assume self has methods from InMemoryTaskManager like setup_sse_consumer,
# enqueue_events_for_sse, dequeue_events_for_sse from the A2A common library
logger = logging.getLogger(__name__)
# Assume 'agent_logic_processor' is our background async function.
# It now takes an async 'send_update' callback to push A2A event objects back.
# async def agent_logic_processor(task_id: str, input_message: Message, send_update: Callable[[TaskStatusUpdateEvent | TaskArtifactUpdateEvent], Awaitable[None]]):
# ... do work ...
# await send_update(TaskStatusUpdateEvent(...)) # Send status
# ... maybe generate artifact part ...
# await send_update(TaskArtifactUpdateEvent(...)) # Send artifact chunk
# ... finish ...
# await send_update(TaskStatusUpdateEvent(..., final=True)) # Send final status
async def on_send_task_subscribe(self, request: SendTaskStreamingRequest):
"""Handles tasks/sendSubscribe request within our TaskManager."""
task_params = request.params
task_id = task_params.id
request_id = request.id # Original JSON-RPC request ID for correlation
logger.info(f"Task {task_id}: Received sendSubscribe request (Req ID: {request_id}).")
try:
# We need a dedicated queue for this client's updates.
# The base TaskManager class provides helpers for this.
sse_event_queue = await self.setup_sse_consumer(task_id)
logger.debug(f"Task {task_id}: SSE consumer queue setup.")
except Exception as e:
# If we can't even set up the queue, we must return an error immediately.
logger.error(f"Task {task_id}: Error setting up SSE consumer: {e}")
# SendTaskStreamingResponse is used even for immediate errors in spec
return SendTaskStreamingResponse(id=request_id, error=InternalError(message="SSE setup failed"))
# This is the function our background task will call to send updates.
# It puts the already-formatted A2A event object onto this connection's queue.
async def send_update_to_queue(update_event: TaskStatusUpdateEvent | TaskArtifactUpdateEvent):
logger.debug(f"Task {task_id}: Queuing update event of type {type(update_event)}")
# The enqueue method is provided by the base TaskManager, handles subscribers.
await self.enqueue_events_for_sse(task_id, update_event)
# Now, we start the actual agent work in the background.
# We MUST NOT await this call here, otherwise the handler blocks!
logger.info(f"Task {task_id}: Starting background agent logic processor.")
background_processing_task = asyncio.create_task(
agent_logic_processor(task_id, task_params.message, send_update_to_queue)
)
# We might want to add a callback to log when the background task finishes or if it errors.
def log_task_completion(task: asyncio.Task):
if task.cancelled():
logger.warning(f"Task {task_id}: Background task was cancelled.")
elif task.exception():
logger.error(f"Task {task_id}: Background task failed: {task.exception()}", exc_info=task.exception())
# Potentially enqueue a final FAILED event here if the task didn't do it
else:
logger.info(f"Task {task_id}: Background task completed successfully.")
background_processing_task.add_done_callback(log_task_completion)
# Finally, we return the SSE response.
# The `dequeue_events_for_sse` (from base TaskManager) is an async generator.
# It waits for items on `sse_event_queue`, formats them into the
# JSON-RPC response structure needed for SSE 'data' field, and yields them.
logger.info(f"Task {task_id}: Returning SSE generator to client.")
# `sse-starlette` takes this async generator to handle the streaming response.
return EventSourceResponse(
self.dequeue_events_for_sse(request_id, task_id, sse_event_queue)
)
Client-Side Considerations:
- SSE Library: We need a library to handle the SSE connection. For Python clients, httpx-sse is a good option. Browsers have EventSource built-in.
- Parsing Logic: We must remember that the data field of each SSE message contains a full JSON-RPC response string. We need to parse this JSON first, then check if it contains a result or an error. If it’s a result, we then parse that to get the specific A2A TaskStatusUpdateEvent or TaskArtifactUpdateEvent.
- Application State: We need logic to take the information from the parsed events and update our application’s state or the user interface.
- Artifact Handling: If the agent streams artifacts using TaskArtifactUpdateEvent and the append flag, our client needs code to collect these pieces and assemble the final result correctly once lastChunk: true is received.
- Connection Management: Our client needs to handle network errors during the stream and should close the connection cleanly when the final: true flag is seen or if an unrecoverable error occurs.
# Conceptual Python Client Snippet using httpx-sse and A2A common types
# python
import httpx
import json
import asyncio
import logging
from httpx_sse import connect_sse
from common.types import (
SendTaskStreamingResponse, TaskStatusUpdateEvent, TaskArtifactUpdateEvent,
TaskState, Part # Import Part for artifact handling check
)
logger = logging.getLogger(__name__)
# Dictionary to store incomplete artifact parts during streaming
# Key: task_id, Value: Dict[artifact_index, List[Part]]
artifact_buffers = {}
async def consume_a2a_stream(url: str, a2a_request_payload: dict):
"""Consumes and processes an A2A SSE stream."""
global artifact_buffers
task_id_from_request = a2a_request_payload.get("params", {}).get("id", "unknown_task")
logger.info(f"Attempting to connect SSE stream to {url} for task {task_id_from_request}")
artifact_buffers[task_id_from_request] = {} # Initialize buffer for this task
try:
async with httpx.AsyncClient(timeout=None) as client:
# Establish the SSE connection
async with connect_sse(client, "POST", url, json=a2a_request_payload) as event_source:
logger.info(f"SSE Connection established to {url}")
# Process events as they arrive
async for sse in event_source.aiter_sse():
logger.debug(f"SSE Raw Event: event='{sse.event}', data='{sse.data[:150]}...'")
try:
# 1. Parse the JSON-RPC response string from the 'data' field
response = SendTaskStreamingResponse(**json.loads(sse.data))
# 2. Check for JSON-RPC level errors first
if response.error:
logger.error(f"Received error in stream: {response.error.code} - {response.error.message}")
# Depending on the error, might want to stop or log and continue
break # Stop processing this stream on error
# 3. Process the successful 'result' part
if response.result:
task_id = response.result.id # Get task ID from the event result
is_final = getattr(response.result, 'final', False) # Check the 'final' flag
# 4. Determine if it's a Status or Artifact update
if isinstance(response.result, TaskStatusUpdateEvent):
status_event = response.result
status_text = "(no message)"
# Safely extract text message if present
if status_event.status.message and status_event.status.message.parts:
text_part = next((p for p in status_event.status.message.parts if hasattr(p, 'text')), None)
if text_part:
status_text = text_part.text
logger.info(f"TASK [{task_id}] STATUS: {status_event.status.state} - Msg: '{status_text}'")
# --- PLACEHOLDER: Update application state/UI based on status ---
elif isinstance(response.result, TaskArtifactUpdateEvent):
artifact_event = response.result
artifact = artifact_event.artifact
logger.info(f"TASK [{task_id}] ARTIFACT: Name={artifact.name} Idx={artifact.index} Append={artifact.append} Last={artifact.lastChunk} Parts={len(artifact.parts)}")
# Logic to handle artifact buffering and reassembly
task_buffer = artifact_buffers.setdefault(task_id, {})
artifact_index = artifact.index
if not artifact.append:
# First chunk for this artifact index
task_buffer[artifact_index] = artifact.parts
else:
# Append parts to existing buffer for this index
if artifact_index in task_buffer:
task_buffer[artifact_index].extend(artifact.parts)
else:
logger.warning(f"Task [{task_id}] Received append=true for unknown artifact index {artifact_index}")
task_buffer[artifact_index] = artifact.parts # Start buffer anyway
if artifact.lastChunk:
logger.info(f"Task [{task_id}] Received last chunk for artifact index {artifact_index}.")
complete_artifact_parts = task_buffer.pop(artifact_index, []) # Get parts and remove from buffer
# --- PLACEHOLDER: Process the complete artifact parts ---
logger.info(f"Task [{task_id}] Processed complete artifact {artifact.name} (Index {artifact_index}) with {len(complete_artifact_parts)} parts.")
else:
logger.warning(f"Received unknown A2A result type in stream: {type(response.result)}")
# 5. Check if the server signalled the end of the stream
if is_final:
logger.info(f"TASK [{task_id}] Final event received via SSE. Closing stream.")
break # Exit the 'async for sse' loop
else:
# This case indicates a malformed JSON-RPC response (no result or error)
logger.warning(f"Received valid SSE message but JSON-RPC response lacked result/error: {sse.data}")
except json.JSONDecodeError:
logger.error(f"Failed to decode SSE data JSON: {sse.data}")
except Exception as e:
# Catch errors during parsing, validation (Pydantic), or processing
logger.error(f"Error processing received SSE event: {e}", exc_info=True)
except httpx.RequestError as e:
# Handle errors related to the HTTP connection itself
logger.error(f"HTTP request error during SSE connection to {url}: {e}")
except Exception as e:
# Catch any other unexpected errors during stream consumption
logger.error(f"Unexpected error consuming SSE stream from {url}: {e}", exc_info=True)
finally:
# Clean up artifact buffer for this task when done or on error
if task_id_from_request in artifact_buffers:
del artifact_buffers[task_id_from_request]
logger.info(f"SSE Consumer for task {task_id_from_request} finished or encountered an error.")
Hosting Implications
So, we’ve explored how A2A uses Server-Sent Events for streaming. Now comes the practical question: where do we actually run our A2A server application, especially if it needs to support these long-lived SSE connections? This choice has real implications for how our system behaves.
Cloud Functions?
A common thought might be to use standard serverless functions, like Google Cloud Functions. They’re great for short, event-driven tasks. However, we run into a challenge here. SSE requires the server to hold the HTTP connection open, potentially for minutes, while it pushes updates back to the client. Cloud Functions are typically designed with timeouts measured in seconds or single-digit minutes. They aren’t built for maintaining these persistent connections directly. Attempting to host a tasks/sendSubscribe endpoint directly on Cloud Functions would likely result in connections being dropped prematurely before our agent finishes its work, breaking the stream.
Cloud Run?
This leads us to consider services better suited for long-running requests within a serverless or managed environment. The recommended approach on Google Cloud is often Cloud Run. We package our A2A server application — perhaps built using Python with FastAPI or Starlette — into a container image. This image is stored in Google Artifact Registry. When we deploy this container to Cloud Run, the platform manages the underlying infrastructure for us, scaling it up or down based on traffic, even scaling down to zero when idle. Crucially, Cloud Run instances do support the long-lived HTTP/S connections necessary for SSE to function correctly according to the A2A specification. This gives us the convenience of serverless operations combined with the technical capability required for streaming. For securing these public-facing services, Cloud Run integrates well with mechanisms like Identity-Aware Proxy (IAP).
Agent Engine?
Another option we should consider, specifically designed for hosting AI agents on Google Cloud, is Vertex AI Agent Engine. This fully managed service aims to simplify the deployment and scaling of agents built with various Python frameworks (including ADK, LangGraph, LangChain, and others via custom templates). Agent Engine simplifies deployment by abstracting away server setup, authentication, and IAM details, providing standard endpoints (like query and streamQuery). Therefore, if we deploy our A2A agent using Agent Engine it is capable of supporting the persistent connections needed for A2A’s tasks/sendSubscribe when invoked correctly through Agent Engine’s mechanisms. Using Agent Engine could streamline the deployment process compared to configuring Cloud Run directly, particularly if we’re already using its framework integrations and features like built-in evaluation or simplified auth.
GKE?
If we need more granular control over the networking environment, scaling policies, or underlying virtual machines, Google Kubernetes Engine (GKE) is a powerful alternative. We would deploy our containerised A2A server as a service within a GKE cluster. GKE has no inherent problem handling long-lived SSE connections. However, this approach comes with the added responsibility of managing the Kubernetes cluster infrastructure itself, which might be more overhead than necessary if Cloud Run meets our needs.
Considering these options, for most use cases aiming to implement A2A streaming as specified, Cloud Run or Agent Engine provide the most balanced solutions, offering scalability and serverless convenience while fully supporting the necessary long-lived HTTP connections.
Observability & Resiliency
Effective Observability is vital. We could configure our application to send detailed, structured logs to Cloud Logging, making sure to include A2A Task IDs and Session IDs for tracing. Implementing distributed tracing using Cloud Trace (perhaps via OpenTelemetry libraries) helps us follow requests as they move between our orchestrator and different A2A agents. We also need Cloud Monitoring to watch performance metrics like latency, error rates, and resource usage, allowing us to set up alerts for potential problems. Finally, if we decide to implement the advanced tasks/resubscribe feature for added resilience against client disconnects, we’ll need a shared, persistent datastore like Firestore or Memorystore (Redis) to hold the state of ongoing tasks so that any server instance can potentially resume the stream.
Resubscription
What if the client’s network blips and the SSE connection drops? A2A specifies tasks/resubscribe so the client can try to reconnect to the same ongoing Task. Implementing this properly on the server side adds complexity. We’d need to save the task’s current state somewhere durable (like Firestore/Redis) and have logic to retrieve that state and figure out what updates the client missed when it reconnects. I will leave this topic for another time to explore 😬
Conclusion
While tasks/send handles quick A2A interactions, tasks/sendSubscribe is our tool for building responsive agents that perform longer tasks. Using Server-Sent Events, it allows server agents to push real-time status and results, improving the experience significantly.
Successfully using streaming involves getting the async programming right on both client and server, and choosing appropriate hosting — Cloud Run and Agent Engine stand out on Google Cloud for their serverless nature combined with support for the long-lived connections SSE needs. We also need to integrate well with tools like Cloud Logging, Trace, and Monitoring to keep things running smoothly. Understanding these pieces helps us build genuinely collaborative and effective multi-agent systems using the A2A protocol.
If you found this helpful please show your appreciation by clapping (you can press and hold for multiple claps 😃)