What is The Agent2Agent Protocol (A2A) and Why You Must Learn It Now
The landscape of artificial intelligence is rapidly evolving. We're moving beyond single, monolithic AI models towards complex ecosystems of specialized AI agents. These agents, often built using different frameworks, by different teams, or even different vendors, need to collaborate to solve increasingly sophisticated problems. However, enabling seamless communication and interoperability between these diverse and often opaque agentic applications presents a significant hurdle. This is precisely the challenge the Agent2Agent (A2A) protocol aims to solve.
Developed as an open initiative driven by Google, the A2A protocol provides a standardized communication layer, a lingua franca, enabling agents built on disparate platforms to discover each other's capabilities, negotiate interactions, exchange information, and work together securely and effectively.
In a world increasingly reliant on multi-agent systems for tasks ranging from complex data analysis and workflow automation to sophisticated customer service and creative generation, the ability for these agents to interoperate is no longer a 'nice-to-have' but a critical necessity. Understanding and implementing A2A is becoming essential for developers, architects, and organizations looking to build scalable, flexible, and powerful AI solutions.
This article dives deep into the Agent2Agent protocol. We'll explore its core concepts, dissect its technical specifications, walk through practical implementation examples with code tutorials, and discuss why mastering A2A now is crucial for anyone involved in the future of AI development.
Tired of Postman? Want a decent postman alternative that doesn't suck?
Apidog is a powerful all-in-one API development platform that's revolutionizing how developers design, test, and document their APIs.
Unlike traditional tools like Postman, Apidog seamlessly integrates API design, automated testing, mock servers, and documentation into a single cohesive workflow. With its intuitive interface, collaborative features, and comprehensive toolset, Apidog eliminates the need to juggle multiple applications during your API development process.
Whether you're a solo developer or part of a large team, Apidog streamlines your workflow, increases productivity, and ensures consistent API quality across your projects.
The Problem: Siloed Agents in a Collaborative World
Imagine an enterprise workflow:
- A customer service chatbot (Agent A, built on Framework X) identifies a complex technical issue.
- It needs to escalate this to a specialized diagnostic agent (Agent B, built internally using Framework Y).
- Agent B analyzes logs and determines a specific software patch is needed.
- It needs to instruct a deployment agent (Agent C, a third-party service using Framework Z) to apply the patch to the customer's specific environment.
Without a common communication standard, integrating these three agents is a complex, bespoke, and brittle process. Developers would need to write custom adapters and translation layers for each agent-pair interaction. Each new agent added, or any update to an existing agent's API, could break the entire chain. The result is a fragmented system, difficult to maintain, scale, or adapt.
This "silo" problem hinders innovation and limits the potential of multi-agent systems. We need a way for agents to:
- Discover: Find other agents and understand what they can do.
- Communicate: Exchange information (text, data, files) in a structured way.
- Coordinate: Manage tasks involving multiple steps and potentially multiple agents.
- Negotiate: Agree on interaction modalities (e.g., text, forms, audio).
- Secure: Interact safely, respecting authentication and authorization.
The Solution: Agent2Agent (A2A) Protocol
The A2A protocol addresses these challenges by defining a standard set of rules and message formats for inter-agent communication. It acts as an abstraction layer, hiding the underlying implementation details of individual agents.
Key Principles of A2A:
- Openness: A2A is an open protocol, fostering collaboration and preventing vendor lock-in. Anyone can implement or contribute to it.
- Interoperability: Its primary goal is to allow agents built with any framework (LangChain, CrewAI, Google ADK, Autogen, custom builds, etc.) or vendor platform to communicate.
- Task-Oriented: Communication revolves around asynchronous "Tasks," allowing for long-running operations and clear tracking of work units.
- Capability Discovery: Agents advertise their capabilities, skills, and supported interaction modes via a standardized "Agent Card."
- Rich Data Exchange: Supports various data types, including text, structured data (JSON, forms), and files (inline or via URI).
- Flexibility: Accommodates different interaction patterns, including simple request-response, streaming updates (via Server-Sent Events), and push notifications (webhooks).
- Security: Incorporates mechanisms for agents to declare their required authentication schemes.
By adopting A2A, developers can build modular and composable AI systems where agents can be swapped, updated, or added with minimal friction, significantly accelerating development and improving maintainability.
Why Learn A2A Now? The Urgency for Interoperability
The shift towards multi-agent architectures is accelerating. Here's why understanding A2A is becoming critical:
- The Rise of Specialized Agents: General-purpose LLMs are powerful, but complex tasks often benefit from smaller, specialized agents working in concert. A2A is the glue that binds them.
- Enterprise AI Adoption: Businesses are moving beyond experimentation to integrate AI into core processes. This requires robust, scalable, and maintainable solutions, which A2A facilitates through standardized communication.
- Avoiding Vendor Lock-in: Relying solely on a single vendor's agent platform creates dependencies. A2A promotes flexibility, allowing organizations to leverage best-of-breed agents from various sources.
- Future-Proofing Applications: Building with interoperability in mind from the start makes applications more adaptable to future changes in AI technology and frameworks.
- Enabling Complex Workflows: Many valuable AI applications involve sequences of tasks handled by different agents (like the customer service example). A2A provides the necessary orchestration framework.
- Foundation for Agent Collaboration: As research into agent collaboration, negotiation, and dynamic team formation progresses, standardized protocols like A2A will be fundamental.
Learning A2A now positions you at the forefront of this paradigm shift, equipping you with the skills to build the next generation of collaborative AI systems.
A2A Core Concepts: A Technical Deep Dive
The A2A protocol is built upon established web standards, primarily using JSON-RPC 2.0 over HTTP(S) for request/response interactions and Server-Sent Events (SSE) for streaming. Let's break down the fundamental components:
(Self-correction: I need to weave in details from the specification and sample code here)
1. Agent Card (/.well-known/agent.json
)
This is the public "business card" of an A2A-compliant agent. It's a JSON file, typically hosted at a well-known URL (/.well-known/agent.json
) relative to the agent's base URL, that describes the agent to potential clients.
- Purpose: Discovery. Clients fetch this card to learn about an agent before interacting with it.
- Key Fields (from
specification/json/a2a.json
andsamples/python/common/types.py
):name
(string, required): Human-readable name of the agent.description
(string, optional): A brief description of the agent's purpose.url
(string, required): The HTTP(S) endpoint where the agent listens for A2A requests (JSON-RPC).version
(string, required): The version of the agent implementation or the A2A spec it adheres to.capabilities
(object, required): Describes the agent's protocol-level features.streaming
(boolean, default: false): Does the agent supporttasks/sendSubscribe
via SSE?pushNotifications
(boolean, default: false): Does the agent support sending updates via webhooks?stateTransitionHistory
(boolean, default: false): Does the agent include historical status transitions in Task objects?
authentication
(object, optional): Specifies required authentication.schemes
(array of strings, required): Lists supported authentication schemes (e.g., "bearer", "apiKey", custom schemes).credentials
(string, optional): Potentially hints about where/how to obtain credentials (use with caution).
defaultInputModes
/defaultOutputModes
(array of strings, optional): Default content types the agent expects/produces (e.g., "text", "application/json", "image/png").skills
(array of objects, required): Describes the specific functionalities the agent offers. Each skill has:id
(string, required): A unique identifier for the skill.name
(string, required): Human-readable name of the skill.description
(string, optional): What the skill does.inputModes
/outputModes
(array of strings, optional): Specific content types supported for this skill.examples
(array of strings, optional): Example prompts or use cases.
Example AgentCard
Structure (Conceptual):
{
"name": "Image Generation Agent",
"description": "Generates images based on text prompts.",
"url": "https://api.example-image-agent.com/a2a",
"version": "1.0.0",
"capabilities": {
"streaming": true,
"pushNotifications": false,
"stateTransitionHistory": true
},
"authentication": {
"schemes": ["apiKey"]
},
"defaultInputModes": ["text"],
"defaultOutputModes": ["image/png"],
"skills": [
{
"id": "generate_image",
"name": "Generate Image",
"description": "Creates an image from a textual description.",
"inputModes": ["text"],
"outputModes": ["image/png"],
"examples": ["Generate an image of a 'blue cat wearing a top hat'"]
}
]
}
2. A2A Server
An agent application that exposes an HTTP endpoint implementing the A2A protocol methods. It listens for incoming JSON-RPC requests, processes them (often involving its underlying AI logic), manages task lifecycles, and sends back responses or streams updates.
The Python samples (samples/python/common/server/server.py
) use Starlette (an ASGI framework) to create the server, mapping the endpoint defined in the AgentCard
's url
field to a request processing function.
3. A2A Client
Any application or another agent that consumes the services of an A2A Server. It constructs JSON-RPC requests (like tasks/send
), sends them to the server's URL, and handles the responses or streamed events.
The Python samples (samples/python/common/client/client.py
) provide an A2AClient
class that abstracts the HTTP requests (using httpx
) and SSE handling (httpx_sse
).
4. Task
The central concept in A2A. A task represents a unit of work initiated by a client and performed by the server agent.
- Lifecycle: Tasks are typically long-running and asynchronous.
- Identification: Each task has a unique
id
(usually a UUID generated by the client) allowing both client and server to track it. An optionalsessionId
can group related tasks. - State: Tasks progress through defined states (
TaskState
enum intypes.py
):submitted
: Initial state after client sends the request.working
: Agent is actively processing the task.input-required
: Agent needs more input from the client to proceed (enables multi-turn conversations).completed
: Task finished successfully.failed
: Task terminated due to an error.canceled
: Task was canceled by the client.unknown
: An indeterminate state.
- Structure (
Task
model intypes.py
):id
(string)sessionId
(string, optional)status
(TaskStatus
object): Contains the currentstate
, timestamp, and potentially the latestMessage
from the agent (e.g., requesting input).artifacts
(list ofArtifact
objects, optional): Outputs generated by the task (see below).history
(list ofMessage
objects, optional): A record of the conversation turns for this task (client can request a specific length).metadata
(dict, optional): Arbitrary key-value data.
5. Message
Represents a single turn of communication within a Task.
- Structure (
Message
model intypes.py
):role
(enum: "user" or "agent"): Indicates the sender.parts
(list ofPart
objects, required): The content of the message (see below).metadata
(dict, optional).
6. Part
The fundamental unit of content within a Message
or Artifact
. A message can contain multiple parts of different types.
- Types (
Part
union intypes.py
):TextPart
: Contains plain text (text
field).FilePart
: Represents a file. Contains afile
object which requires either:bytes
: Base64-encoded file content (for small files).uri
: A URI pointing to the file location. It can also includename
andmimeType
.
DataPart
: Holds structured JSON data (data
field). Useful for forms, structured results, etc.- Each
Part
can also have optionalmetadata
.
7. Artifact
Represents outputs generated by the agent during task execution, separate from conversational messages. Examples include generated files (code, images, documents) or final structured data results.
- Structure (
Artifact
model intypes.py
):name
/description
(string, optional)parts
(list ofPart
objects, required): The content of the artifact.metadata
(dict, optional)index
,append
,lastChunk
(optional): Used for streaming large artifacts in chunks.
8. Communication Flow (Typical)
- Discovery: Client Agent fetches the Server Agent's
AgentCard
from/.well-known/agent.json
. It checks capabilities, authentication requirements, and available skills. - Initiation: Client generates a unique Task ID. It sends an initial
Message
(role: "user") within atasks/send
(for simple request/response) ortasks/sendSubscribe
(for streaming) request to the Server's A2A endpoint URL. - Processing & Updates:
- Non-Streaming (
tasks/send
): The Server processes the task synchronously (from the client's perspective, though it might be async internally). It eventually returns a finalTask
object in the JSON-RPC response, indicating a terminal state (completed
,failed
). - Streaming (
tasks/sendSubscribe
): The Server immediately acknowledges the request (often with an initialTaskStatusUpdateEvent
indicatingsubmitted
orworking
). It then uses Server-Sent Events (SSE) over the same HTTP connection to push updates to the client as the task progresses. These updates can be:TaskStatusUpdateEvent
: Notifies changes in the task'sstate
(e.g.,working
->input-required
,working
->completed
).TaskArtifactUpdateEvent
: Sends generatedArtifact
data, potentially chunked.
- Non-Streaming (
- Interaction (if
input-required
): If the Server Agent sends aTaskStatusUpdateEvent
with stateinput-required
(optionally including aMessage
asking a question), the Client sends a subsequentMessage
(role: "user") using the same Task ID via anothertasks/send
ortasks/sendSubscribe
request to provide the needed input. - Completion: The task eventually reaches a terminal state (
completed
,failed
,canceled
), indicated in the finalTask
object (fortasks/send
) or thefinal: true
flag in aTaskStatusUpdateEvent
(fortasks/sendSubscribe
).
9. JSON-RPC Methods
A2A defines standard JSON-RPC 2.0 methods for task management:
tasks/send
: Initiates or continues a task, expects a singleTask
response upon completion.tasks/sendSubscribe
: Initiates or continues a task, expects a stream of SSE updates (TaskStatusUpdateEvent
,TaskArtifactUpdateEvent
).tasks/get
: Retrieves the current state and history of a specific task by its ID.tasks/cancel
: Requests cancellation of an ongoing task. Success is not guaranteed (agent might be too far along or not support cancellation).tasks/pushNotification/set
: (If supported) Client provides a webhook URL for the server to push task updates to, instead of relying on SSE or pollingtasks/get
.tasks/pushNotification/get
: Retrieves the currently configured push notification settings for a task.tasks/resubscribe
: (If supported) Allows a client to reconnect to the SSE stream for an existing task (e.g., after a network interruption).
(Self-correction: Now transition to the practical examples and tutorials, referencing the sample code structures identified earlier.)
Getting Started: Tutorials and Code Examples
Let's make this concrete. We'll use the Python samples provided in the A2A-main/samples/python/
directory as a basis for our tutorials. These samples provide common client (common/client/
) and server (common/server/
) components, along with specific agent implementations (agents/
) and host applications (hosts/
).
Prerequisites:
- Python 3.10+ (as indicated by sample setups)
- Familiarity with basic Python, async/await, and HTTP concepts.
- Access to the
A2A-main
codebase. (You can likely clone it from its source if it's public). - An environment manager like
venv
orconda
. - Installation tool like
pip
oruv
(the samples useuv
).
Setting up the Environment (Example using venv
and pip
):
# Navigate to the python samples directory
cd A2A-main/samples/python
# Create a virtual environment
python -m venv .venv
# Activate the environment
# On macOS/Linux:
source .venv/bin/activate
# On Windows:
# .venv\\Scripts\\activate
# Install dependencies (assuming a requirements.txt exists or from pyproject.toml)
# If using pyproject.toml (like the samples):
pip install . # Installs dependencies defined in pyproject.toml
# You might need specific API keys (e.g., Google API Key)
# Set them as environment variables or using a .env file
# export GOOGLE_API_KEY="YOUR_API_KEY_HERE"
Tutorial 1: Building a Simple A2A Server (Echo Agent)
We'll create a basic agent that simply echoes back the user's message.
Define the Agent Card: Create a Python script (e.g.,
echo_server.py
) and define theAgentCard
.# echo_server.py import os import uvicorn import asyncio import logging from uuid import uuid4 # Assuming common types and server are importable # (adjust imports based on your project structure) from common.types import ( AgentCard, AgentCapabilities, AgentSkill, Task, TaskState, TaskStatus, Message, TextPart, SendTaskRequest, SendTaskResponse, JSONRPCResponse, JSONRPCError, InternalError ) from common.server import A2AServer, TaskManager # Configure basic logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # --- Agent Definition --- ECHO_AGENT_CARD = AgentCard( name="Echo Agent", description="A simple A2A agent that echoes back user messages.", url="http://localhost:8001/a2a", # Where this server will run version="0.1.0", capabilities=AgentCapabilities( streaming=False, # This simple agent won't stream pushNotifications=False, stateTransitionHistory=False ), authentication=None, # No auth for this simple example defaultInputModes=["text"], defaultOutputModes=["text"], skills=[ AgentSkill( id="echo", name="Echo Message", description="Receives a text message and sends it back.", inputModes=["text"], outputModes=["text"], examples=["'Hello there!' -> 'Hello there!'"] ) ] ) # --- Task Management Logic --- class EchoTaskManager(TaskManager): def __init__(self): # Simple in-memory store for tasks self.tasks: dict[str, Task] = {} self.lock = asyncio.Lock() async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse: task_params = request.params task_id = task_params.id user_message = task_params.message logger.info(f"Received task {task_id} with message: {user_message.parts}") # Basic validation: Expecting a single TextPart if not user_message.parts or not isinstance(user_message.parts[0], TextPart): logger.error(f"Task {task_id}: Invalid input - expected TextPart.") error = JSONRPCError(code=-32602, message="Invalid input: Expected a single TextPart.") return SendTaskResponse(id=request.id, error=error) user_text = user_message.parts[0].text # Create the agent's response message agent_response_message = Message( role="agent", parts=[TextPart(text=f"You said: {user_text}")] ) # Create the final Task object final_task_status = TaskStatus( state=TaskState.COMPLETED, message=agent_response_message # Include final message in status ) completed_task = Task( id=task_id, sessionId=task_params.sessionId, status=final_task_status, artifacts=[], # No artifacts for echo history=[user_message, agent_response_message] # Simple history ) # Store the completed task (optional for this simple case) async with self.lock: self.tasks[task_id] = completed_task logger.info(f"Task {task_id} completed.") # Return the completed task in the response return SendTaskResponse(id=request.id, result=completed_task) # --- Implement other required abstract methods (can raise NotImplemented) --- async def on_get_task(self, request): # Basic implementation for demonstration async with self.lock: task = self.tasks.get(request.params.id) if task: return JSONRPCResponse(id=request.id, result=task) else: error = JSONRPCError(code=-32001, message="Task not found") return JSONRPCResponse(id=request.id, error=error) async def on_cancel_task(self, request): error = JSONRPCError(code=-32004, message="Cancel not supported") return JSONRPCResponse(id=request.id, error=error) async def on_send_task_subscribe(self, request): error = JSONRPCError(code=-32004, message="Streaming not supported") return JSONRPCResponse(id=request.id, error=error) async def on_set_task_push_notification(self, request): error = JSONRPCError(code=-32003, message="Push notifications not supported") return JSONRPCResponse(id=request.id, error=error) async def on_get_task_push_notification(self, request): error = JSONRPCError(code=-32003, message="Push notifications not supported") return JSONRPCResponse(id=request.id, error=error) async def on_resubscribe_to_task(self, request): error = JSONRPCError(code=-32004, message="Resubscribe not supported") return JSONRPCResponse(id=request.id, error=error) # --- Server Setup --- if __name__ == "__main__": task_manager = EchoTaskManager() server = A2AServer( host="localhost", port=8001, endpoint="/a2a", # Matches AgentCard URL path agent_card=ECHO_AGENT_CARD, task_manager=task_manager ) print("Starting Echo A2A Server on http://localhost:8001") # Use server.start() which calls uvicorn.run # Note: For production, use a proper ASGI server like uvicorn or hypercorn directly server.start() # Alternatively, run directly with uvicorn: # uvicorn.run(server.app, host="localhost", port=8001)
Run the Server:
python echo_server.py
You should see output indicating the server has started on
http://localhost:8001
.Test with
curl
(or create a client): You can send a JSON-RPC request usingcurl
.curl -X POST http://localhost:8001/a2a \ -H "Content-Type: application/json" \ -d '{ "jsonrpc": "2.0", "method": "tasks/send", "id": "my-echo-task-123", "params": { "id": "my-echo-task-123", "sessionId": "session-abc", "message": { "role": "user", "parts": [ { "type": "text", "text": "Hello A2A World!" } ] } } }'
Expected Response: You should receive a JSON-RPC response containing the completed
Task
object:{ "jsonrpc": "2.0", "id": "my-echo-task-123", "result": { "id": "my-echo-task-123", "sessionId": "session-abc", "status": { "state": "completed", "message": { "role": "agent", "parts": [ { "type": "text", "text": "You said: Hello A2A World!" } ], "metadata": null }, "timestamp": "..." // ISO timestamp }, "artifacts": [], "history": [ { "role": "user", "parts": [ { "type": "text", "text": "Hello A2A World!" } ], "metadata": null }, { "role": "agent", "parts": [ { "type": "text", "text": "You said: Hello A2A World!" } ], "metadata": null } ], "metadata": null }, "error": null }
This simple example demonstrates the basic request/response flow using tasks/send
.
Tutorial 2: Building an A2A Client (Python)
Now, let's create a client to interact with our Echo Server
.
# echo_client.py
import asyncio
import logging
from uuid import uuid4
# Assuming common types and client are importable
from common.client import A2AClient, card_resolver # card_resolver might be needed
from common.types import Message, TextPart, AgentCard # Import AgentCard if needed directly
# Configure basic logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
ECHO_SERVER_URL = "http://localhost:8001/a2a" # URL from Echo Server's AgentCard
async def main():
# In a real scenario, you might fetch the AgentCard first
# try:
# agent_card = await card_resolver.fetch_agent_card(ECHO_SERVER_URL)
# client = A2AClient(agent_card=agent_card)
# except Exception as e:
# logger.error(f"Failed to fetch AgentCard or initialize client: {e}")
# return
# For simplicity, we'll use the URL directly
client = A2AClient(url=ECHO_SERVER_URL)
task_id = f"echo-task-{uuid4().hex}"
session_id = f"session-{uuid4().hex}"
user_text = "Testing the echo client!"
# Construct the user message
user_message = Message(
role="user",
parts=[TextPart(text=user_text)]
)
# Prepare the parameters for tasks/send
send_params = {
"id": task_id,
"sessionId": session_id,
"message": user_message,
# Optional: acceptedOutputModes, pushNotification, historyLength, metadata
}
try:
logger.info(f"Sending task {task_id} to {ECHO_SERVER_URL}...")
# Use the client's send_task method
# It handles constructing the JSONRPCRequest internally
response = await client.send_task(payload=send_params)
if response.error:
logger.error(f"Task {task_id} failed: {response.error.message} (Code: {response.error.code})")
elif response.result:
task_result = response.result
logger.info(f"Task {task_id} completed with state: {task_result.status.state}")
if task_result.status.message and task_result.status.message.parts:
agent_part = task_result.status.message.parts[0]
if isinstance(agent_part, TextPart):
logger.info(f"Agent response: {agent_part.text}")
else:
logger.warning("Agent response was not TextPart")
else:
logger.warning("No message part in agent response status")
else:
logger.error(f"Received unexpected response for task {task_id}: {response}")
except Exception as e:
logger.error(f"An error occurred while communicating with the agent: {e}")
if __name__ == "__main__":
asyncio.run(main())
Run the Client:
Ensure the echo_server.py
is running first, then run the client:
python echo_client.py
You should see log output showing the task being sent and the echoed response received from the server.
Tutorial 3: Handling Streaming (tasks/sendSubscribe
)
Let's modify the server and client to use streaming via SSE. Imagine an agent that provides status updates as it "works".
Server Modifications (streaming_echo_server.py
):
# streaming_echo_server.py (Modifications based on echo_server.py)
# ... (Imports similar to echo_server.py, add AsyncIterable)
import time
from typing import AsyncIterable
from common.types import (
# ... other imports ...
TaskState, TaskStatus, TaskStatusUpdateEvent, SendTaskStreamingRequest,
SendTaskStreamingResponse
)
from common.server import A2AServer, TaskManager
# --- Agent Definition (Update Capabilities) ---
STREAMING_ECHO_AGENT_CARD = AgentCard(
# ... (name, description, url, version, etc. as before) ...
url="http://localhost:8002/a2a", # Use a different port
capabilities=AgentCapabilities(
streaming=True, # <<< Enable streaming capability
pushNotifications=False,
stateTransitionHistory=True # Let's include history
),
# ... (rest of the card) ...
)
# --- Task Management Logic (Implement on_send_task_subscribe) ---
class StreamingEchoTaskManager(TaskManager):
def __init__(self):
self.tasks: dict[str, Task] = {} # Store task state
self.lock = asyncio.Lock()
# NOTE: The common InMemoryTaskManager in samples handles SSE queueing
# If NOT using that base class, you'd need SSE queue management here.
# For this example, we'll simulate the async generator directly.
async def _do_work(self, task_id: str, user_text: str) -> AsyncIterable[SendTaskStreamingResponse]:
logger.info(f"Task {task_id}: Starting work...")
# 1. Send 'working' status update
working_status = TaskStatus(state=TaskState.WORKING)
yield SendTaskStreamingResponse(
id=None, # SSE events don't need request ID
result=TaskStatusUpdateEvent(id=task_id, status=working_status)
)
await asyncio.sleep(1) # Simulate work
# 2. Simulate some progress (optional)
progress_status = TaskStatus(state=TaskState.WORKING, message=Message(role="agent", parts=[TextPart(text="Thinking...")]))
yield SendTaskStreamingResponse(
id=None,
result=TaskStatusUpdateEvent(id=task_id, status=progress_status)
)
await asyncio.sleep(2) # Simulate more work
# 3. Send 'completed' status update with final message
agent_response_message = Message(
role="agent",
parts=[TextPart(text=f"You said (streamed): {user_text}")]
)
completed_status = TaskStatus(
state=TaskState.COMPLETED,
message=agent_response_message
)
yield SendTaskStreamingResponse(
id=None,
result=TaskStatusUpdateEvent(id=task_id, status=completed_status, final=True) # Mark as final
)
logger.info(f"Task {task_id}: Work completed.")
async def on_send_task_subscribe(
self, request: SendTaskStreamingRequest
) -> AsyncIterable[SendTaskStreamingResponse]:
task_params = request.params
task_id = task_params.id
user_message = task_params.message
logger.info(f"Received streaming task {task_id}")
# Validate input
if not user_message.parts or not isinstance(user_message.parts[0], TextPart):
logger.error(f"Task {task_id}: Invalid input.")
error = JSONRPCError(code=-32602, message="Invalid input")
# Cannot yield an error directly in SSE stream start easily with this structure
# A real server might handle this differently (e.g., immediate error response
# before starting stream, or an error event in the stream).
# For simplicity, log and return empty stream or raise
raise ValueError("Invalid input for streaming task") # Or return empty async iter
user_text = user_message.parts[0].text
# Store initial task state (optional but good practice)
initial_task = Task(
id=task_id,
sessionId=task_params.sessionId,
status=TaskStatus(state=TaskState.SUBMITTED), # Or WORKING immediately
history=[user_message]
)
async with self.lock:
self.tasks[task_id] = initial_task
# Return the async generator that yields SSE events
return self._do_work(task_id, user_text)
# --- Implement other methods (on_send_task, on_get_task etc.) ---
# ... (Implementations similar to EchoTaskManager, adapting for streaming context
# or raising NotImplemented if only streaming is supported) ...
async def on_send_task(self, request):
error = JSONRPCError(code=-32004, message="Use tasks/sendSubscribe for this agent")
return SendTaskResponse(id=request.id, error=error)
# ... other methods ...
# --- Server Setup ---
if __name__ == "__main__":
task_manager = StreamingEchoTaskManager()
server = A2AServer(
host="localhost",
port=8002, # Different port
endpoint="/a2a",
agent_card=STREAMING_ECHO_AGENT_CARD,
task_manager=task_manager
)
print("Starting Streaming Echo A2A Server on http://localhost:8002")
server.start()
Client Modifications (streaming_echo_client.py
):
# streaming_echo_client.py (Modifications based on echo_client.py)
import asyncio
import logging
from uuid import uuid4
from common.client import A2AClient
from common.types import Message, TextPart, TaskStatusUpdateEvent, TaskArtifactUpdateEvent
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
STREAMING_ECHO_SERVER_URL = "http://localhost:8002/a2a"
async def main():
client = A2AClient(url=STREAMING_ECHO_SERVER_URL)
task_id = f"stream-task-{uuid4().hex}"
user_text = "Testing the streaming client!"
user_message = Message(role="user", parts=[TextPart(text=user_text)])
send_params = {
"id": task_id,
"message": user_message,
}
try:
logger.info(f"Sending streaming task {task_id} to {STREAMING_ECHO_SERVER_URL}...")
# Use the client's send_task_streaming method
async for response in client.send_task_streaming(payload=send_params):
if response.error:
# Errors might be sent as part of the stream in some implementations
logger.error(f"Received error in stream for task {task_id}: {response.error.message}")
break # Stop processing stream on error
elif response.result:
event = response.result
if isinstance(event, TaskStatusUpdateEvent):
logger.info(f"Task {task_id} Status Update: {event.status.state}")
if event.status.message and event.status.message.parts:
part = event.status.message.parts[0]
if isinstance(part, TextPart):
logger.info(f" Agent Message: {part.text}")
if event.final:
logger.info(f"Task {task_id} reached final state.")
break # Exit loop once task is final
elif isinstance(event, TaskArtifactUpdateEvent):
logger.info(f"Task {task_id} Artifact Update: {event.artifact.name}")
# Process artifact parts...
else:
logger.warning(f"Received unknown event type in stream: {type(event)}")
else:
logger.error(f"Received unexpected empty response in stream for task {task_id}")
except Exception as e:
logger.error(f"An error occurred during streaming communication: {e}")
if __name__ == "__main__":
asyncio.run(main())
Run: Start streaming_echo_server.py
, then run streaming_echo_client.py
. You'll see the client log status updates (WORKING
, COMPLETED
) and the final echoed message as they are streamed from the server.
(Self-correction: The article is getting long. Need to cover parts, artifacts, advanced topics, and conclusion concisely but thoroughly to reach the target word count and scope.)
Tutorial 4: Working with Different Data Parts (FilePart
, DataPart
)
A2A's strength lies in handling diverse data.
Sending Files (
FilePart
):- Client: When creating the
Message
orArtifact
, add aFilePart
.- For small files: Read content, base64 encode it, and put it in
FilePart(file=FileContent(bytes=encoded_content, name="...", mimeType="..."))
. - For large files/external references: Use
FilePart(file=FileContent(uri="file:///path/to/local/file", name="...", mimeType="..."))
orFilePart(file=FileContent(uri="https://example.com/remote/file", name="...", mimeType="..."))
. The server needs to be able to resolve and access the URI.
- For small files: Read content, base64 encode it, and put it in
- Server: In the task handler, check
message.parts
orartifact.parts
. If aFilePart
is found, decode thebytes
or fetch the content from theuri
.
- Client: When creating the
Sending Structured Data (
DataPart
):- Client: Create a dictionary with the structured data. Add
DataPart(data=my_dict)
to theMessage
orArtifact
. - Server: Check parts for
DataPart
. Access the dictionary viapart.data
. Useful for sending JSON payloads, form data, or configuration.
- Client: Create a dictionary with the structured data. Add
Example Snippet (Client sending different parts):
# client_sending_parts.py (Conceptual)
from common.types import Message, TextPart, FilePart, DataPart, FileContent
import base64
# ... client setup ...
# 1. Text
text_part = TextPart(text="Analyze the attached data and file.")
# 2. Structured Data
json_data = {"user_id": 123, "preferences": {"theme": "dark", "notifications": True}}
data_part = DataPart(data=json_data, metadata={"source": "user_profile"})
# 3. File (Inline - small file)
try:
with open("report.txt", "rb") as f:
file_bytes = f.read()
encoded_bytes = base64.b64encode(file_bytes).decode('utf-8')
file_part_inline = FilePart(
file=FileContent(bytes=encoded_bytes, name="report.txt", mimeType="text/plain")
)
except Exception as e:
logger.error(f"Failed to read inline file: {e}")
file_part_inline = None
# 4. File (URI - assumes server can access)
file_part_uri = FilePart(
file=FileContent(uri="file:///shared/data/large_dataset.csv", name="large_dataset.csv", mimeType="text/csv")
)
parts_list = [text_part, data_part]
if file_part_inline:
parts_list.append(file_part_inline)
parts_list.append(file_part_uri)
user_message = Message(role="user", parts=parts_list)
# ... send message using client.send_task(...) ...
Advanced Topics & Use Cases
- Agent Discovery: The
/.well-known/agent.json
endpoint is the standard mechanism. Clients should fetch this first. Implementations might include caching strategies. Thecard_resolver.py
in the Python samples hints at this. - Authentication: The
AgentCard
declares supportedschemes
. The client must choose a compatible scheme and include the necessary credentials (e.g., Bearer token inAuthorization
header, API key in header/query param) when making HTTP requests to the A2Aurl
. The A2A protocol itself doesn't dictate how auth is implemented, only how it's advertised. Secure handling of credentials is vital. - Error Handling: A2A uses standard JSON-RPC 2.0 error codes (
-32700
Parse Error,-32600
Invalid Request,-32601
Method Not Found,-32602
Invalid Params,-32603
Internal Error) plus custom codes for A2A-specific issues (e.g.,-32001
Task Not Found,-32002
Task Not Cancelable). Servers should return appropriate error objects, and clients must handle them gracefully. The sampleA2AServer
shows basic error mapping. - Push Notifications: For scenarios where SSE is unsuitable (e.g., client behind restrictive firewall, very long-lived tasks where keeping connection open is costly), push notifications offer an alternative.
- Client calls
tasks/pushNotification/set
, providing its own webhookurl
, an optionaltoken
for verification, and potentiallyauthentication
details needed for the server to call the client's webhook. - Server stores this configuration for the task.
- When a task update occurs, the Server makes an HTTP POST request to the client's webhook
url
, sending theTaskStatusUpdateEvent
orTaskArtifactUpdateEvent
in the request body (potentially verifying using the providedtoken
). - Client's webhook endpoint receives the update.
- Client calls
- Integration with Frameworks: The true power of A2A shines when bridging different agent frameworks. The
A2A-main/samples/
directory shows examples for:- Google Agent Development Kit (ADK)
- CrewAI
- LangGraph
- Genkit (JavaScript)
These samples typically involve wrapping the framework's agent logic within an A2A Server implementation (like the
TaskManager
) and using theA2AClient
to call other A2A agents.
- Enterprise Readiness: A2A is designed with enterprise needs in mind, supporting security declarations, asynchronous operations suitable for complex workflows, and clear interfaces for monitoring and management (via
tasks/get
).
The Future of A2A
The A2A protocol is actively evolving. Future enhancements outlined in the README.md
include:
- Improved Agent Discovery: Formalizing how auth schemes and credentials (optional) are included in the
AgentCard
. - Enhanced Collaboration: Investigating methods like
QuerySkill()
for dynamic capability checking. - Richer Task Lifecycle/UX: Supporting dynamic negotiation of interaction modes (e.g., adding audio mid-task).
- Protocol Extensions: Exploring client-initiated methods beyond task management and improving streaming/push reliability.
- Better Samples & Docs: Continuously improving examples and documentation clarity.
Conclusion: Embrace the Interoperable Future
The Agent2Agent protocol is more than just a technical specification; it's a foundational piece for building the next generation of intelligent, collaborative AI systems. As AI agents become more specialized and diverse, the need for a standard communication protocol becomes paramount. A2A provides a robust, open, and flexible solution to the critical challenge of interoperability.
By enabling agents built on different frameworks and platforms to discover, communicate, and coordinate tasks effectively, A2A unlocks new possibilities for complex workflow automation, sophisticated multi-agent applications, and a more modular, scalable approach to AI development.
The time to learn and adopt A2A is now. Whether you're building individual agents, designing enterprise AI solutions, or contributing to AI frameworks, understanding and leveraging A2A will be crucial for navigating the increasingly interconnected future of artificial intelligence. Explore the specification, experiment with the samples, and consider contributing to this vital open standard.