What is The Agent2Agent Protocol (A2A) and Why You Must Learn It Now

Community Article Published April 12, 2025

The landscape of artificial intelligence is rapidly evolving. We're moving beyond single, monolithic AI models towards complex ecosystems of specialized AI agents. These agents, often built using different frameworks, by different teams, or even different vendors, need to collaborate to solve increasingly sophisticated problems. However, enabling seamless communication and interoperability between these diverse and often opaque agentic applications presents a significant hurdle. This is precisely the challenge the Agent2Agent (A2A) protocol aims to solve.

Developed as an open initiative driven by Google, the A2A protocol provides a standardized communication layer, a lingua franca, enabling agents built on disparate platforms to discover each other's capabilities, negotiate interactions, exchange information, and work together securely and effectively.

In a world increasingly reliant on multi-agent systems for tasks ranging from complex data analysis and workflow automation to sophisticated customer service and creative generation, the ability for these agents to interoperate is no longer a 'nice-to-have' but a critical necessity. Understanding and implementing A2A is becoming essential for developers, architects, and organizations looking to build scalable, flexible, and powerful AI solutions.

This article dives deep into the Agent2Agent protocol. We'll explore its core concepts, dissect its technical specifications, walk through practical implementation examples with code tutorials, and discuss why mastering A2A now is crucial for anyone involved in the future of AI development.

Tired of Postman? Want a decent postman alternative that doesn't suck?

Apidog is a powerful all-in-one API development platform that's revolutionizing how developers design, test, and document their APIs.

Unlike traditional tools like Postman, Apidog seamlessly integrates API design, automated testing, mock servers, and documentation into a single cohesive workflow. With its intuitive interface, collaborative features, and comprehensive toolset, Apidog eliminates the need to juggle multiple applications during your API development process.

Whether you're a solo developer or part of a large team, Apidog streamlines your workflow, increases productivity, and ensures consistent API quality across your projects.

The Problem: Siloed Agents in a Collaborative World

Imagine an enterprise workflow:

  1. A customer service chatbot (Agent A, built on Framework X) identifies a complex technical issue.
  2. It needs to escalate this to a specialized diagnostic agent (Agent B, built internally using Framework Y).
  3. Agent B analyzes logs and determines a specific software patch is needed.
  4. It needs to instruct a deployment agent (Agent C, a third-party service using Framework Z) to apply the patch to the customer's specific environment.

Without a common communication standard, integrating these three agents is a complex, bespoke, and brittle process. Developers would need to write custom adapters and translation layers for each agent-pair interaction. Each new agent added, or any update to an existing agent's API, could break the entire chain. The result is a fragmented system, difficult to maintain, scale, or adapt.

This "silo" problem hinders innovation and limits the potential of multi-agent systems. We need a way for agents to:

  • Discover: Find other agents and understand what they can do.
  • Communicate: Exchange information (text, data, files) in a structured way.
  • Coordinate: Manage tasks involving multiple steps and potentially multiple agents.
  • Negotiate: Agree on interaction modalities (e.g., text, forms, audio).
  • Secure: Interact safely, respecting authentication and authorization.

The Solution: Agent2Agent (A2A) Protocol

The A2A protocol addresses these challenges by defining a standard set of rules and message formats for inter-agent communication. It acts as an abstraction layer, hiding the underlying implementation details of individual agents.

Key Principles of A2A:

  1. Openness: A2A is an open protocol, fostering collaboration and preventing vendor lock-in. Anyone can implement or contribute to it.
  2. Interoperability: Its primary goal is to allow agents built with any framework (LangChain, CrewAI, Google ADK, Autogen, custom builds, etc.) or vendor platform to communicate.
  3. Task-Oriented: Communication revolves around asynchronous "Tasks," allowing for long-running operations and clear tracking of work units.
  4. Capability Discovery: Agents advertise their capabilities, skills, and supported interaction modes via a standardized "Agent Card."
  5. Rich Data Exchange: Supports various data types, including text, structured data (JSON, forms), and files (inline or via URI).
  6. Flexibility: Accommodates different interaction patterns, including simple request-response, streaming updates (via Server-Sent Events), and push notifications (webhooks).
  7. Security: Incorporates mechanisms for agents to declare their required authentication schemes.

By adopting A2A, developers can build modular and composable AI systems where agents can be swapped, updated, or added with minimal friction, significantly accelerating development and improving maintainability.

Why Learn A2A Now? The Urgency for Interoperability

The shift towards multi-agent architectures is accelerating. Here's why understanding A2A is becoming critical:

  1. The Rise of Specialized Agents: General-purpose LLMs are powerful, but complex tasks often benefit from smaller, specialized agents working in concert. A2A is the glue that binds them.
  2. Enterprise AI Adoption: Businesses are moving beyond experimentation to integrate AI into core processes. This requires robust, scalable, and maintainable solutions, which A2A facilitates through standardized communication.
  3. Avoiding Vendor Lock-in: Relying solely on a single vendor's agent platform creates dependencies. A2A promotes flexibility, allowing organizations to leverage best-of-breed agents from various sources.
  4. Future-Proofing Applications: Building with interoperability in mind from the start makes applications more adaptable to future changes in AI technology and frameworks.
  5. Enabling Complex Workflows: Many valuable AI applications involve sequences of tasks handled by different agents (like the customer service example). A2A provides the necessary orchestration framework.
  6. Foundation for Agent Collaboration: As research into agent collaboration, negotiation, and dynamic team formation progresses, standardized protocols like A2A will be fundamental.

Learning A2A now positions you at the forefront of this paradigm shift, equipping you with the skills to build the next generation of collaborative AI systems.

A2A Core Concepts: A Technical Deep Dive

The A2A protocol is built upon established web standards, primarily using JSON-RPC 2.0 over HTTP(S) for request/response interactions and Server-Sent Events (SSE) for streaming. Let's break down the fundamental components:

(Self-correction: I need to weave in details from the specification and sample code here)

1. Agent Card (/.well-known/agent.json)

This is the public "business card" of an A2A-compliant agent. It's a JSON file, typically hosted at a well-known URL (/.well-known/agent.json) relative to the agent's base URL, that describes the agent to potential clients.

  • Purpose: Discovery. Clients fetch this card to learn about an agent before interacting with it.
  • Key Fields (from specification/json/a2a.json and samples/python/common/types.py):
    • name (string, required): Human-readable name of the agent.
    • description (string, optional): A brief description of the agent's purpose.
    • url (string, required): The HTTP(S) endpoint where the agent listens for A2A requests (JSON-RPC).
    • version (string, required): The version of the agent implementation or the A2A spec it adheres to.
    • capabilities (object, required): Describes the agent's protocol-level features.
      • streaming (boolean, default: false): Does the agent support tasks/sendSubscribe via SSE?
      • pushNotifications (boolean, default: false): Does the agent support sending updates via webhooks?
      • stateTransitionHistory (boolean, default: false): Does the agent include historical status transitions in Task objects?
    • authentication (object, optional): Specifies required authentication.
      • schemes (array of strings, required): Lists supported authentication schemes (e.g., "bearer", "apiKey", custom schemes).
      • credentials (string, optional): Potentially hints about where/how to obtain credentials (use with caution).
    • defaultInputModes / defaultOutputModes (array of strings, optional): Default content types the agent expects/produces (e.g., "text", "application/json", "image/png").
    • skills (array of objects, required): Describes the specific functionalities the agent offers. Each skill has:
      • id (string, required): A unique identifier for the skill.
      • name (string, required): Human-readable name of the skill.
      • description (string, optional): What the skill does.
      • inputModes / outputModes (array of strings, optional): Specific content types supported for this skill.
      • examples (array of strings, optional): Example prompts or use cases.

Example AgentCard Structure (Conceptual):

{
  "name": "Image Generation Agent",
  "description": "Generates images based on text prompts.",
  "url": "https://api.example-image-agent.com/a2a",
  "version": "1.0.0",
  "capabilities": {
    "streaming": true,
    "pushNotifications": false,
    "stateTransitionHistory": true
  },
  "authentication": {
    "schemes": ["apiKey"]
  },
  "defaultInputModes": ["text"],
  "defaultOutputModes": ["image/png"],
  "skills": [
    {
      "id": "generate_image",
      "name": "Generate Image",
      "description": "Creates an image from a textual description.",
      "inputModes": ["text"],
      "outputModes": ["image/png"],
      "examples": ["Generate an image of a 'blue cat wearing a top hat'"]
    }
  ]
}

2. A2A Server

An agent application that exposes an HTTP endpoint implementing the A2A protocol methods. It listens for incoming JSON-RPC requests, processes them (often involving its underlying AI logic), manages task lifecycles, and sends back responses or streams updates.

The Python samples (samples/python/common/server/server.py) use Starlette (an ASGI framework) to create the server, mapping the endpoint defined in the AgentCard's url field to a request processing function.

3. A2A Client

Any application or another agent that consumes the services of an A2A Server. It constructs JSON-RPC requests (like tasks/send), sends them to the server's URL, and handles the responses or streamed events.

The Python samples (samples/python/common/client/client.py) provide an A2AClient class that abstracts the HTTP requests (using httpx) and SSE handling (httpx_sse).

4. Task

The central concept in A2A. A task represents a unit of work initiated by a client and performed by the server agent.

  • Lifecycle: Tasks are typically long-running and asynchronous.
  • Identification: Each task has a unique id (usually a UUID generated by the client) allowing both client and server to track it. An optional sessionId can group related tasks.
  • State: Tasks progress through defined states (TaskState enum in types.py):
    • submitted: Initial state after client sends the request.
    • working: Agent is actively processing the task.
    • input-required: Agent needs more input from the client to proceed (enables multi-turn conversations).
    • completed: Task finished successfully.
    • failed: Task terminated due to an error.
    • canceled: Task was canceled by the client.
    • unknown: An indeterminate state.
  • Structure (Task model in types.py):
    • id (string)
    • sessionId (string, optional)
    • status (TaskStatus object): Contains the current state, timestamp, and potentially the latest Message from the agent (e.g., requesting input).
    • artifacts (list of Artifact objects, optional): Outputs generated by the task (see below).
    • history (list of Message objects, optional): A record of the conversation turns for this task (client can request a specific length).
    • metadata (dict, optional): Arbitrary key-value data.

5. Message

Represents a single turn of communication within a Task.

  • Structure (Message model in types.py):
    • role (enum: "user" or "agent"): Indicates the sender.
    • parts (list of Part objects, required): The content of the message (see below).
    • metadata (dict, optional).

6. Part

The fundamental unit of content within a Message or Artifact. A message can contain multiple parts of different types.

  • Types (Part union in types.py):
    • TextPart: Contains plain text (text field).
    • FilePart: Represents a file. Contains a file object which requires either:
      • bytes: Base64-encoded file content (for small files).
      • uri: A URI pointing to the file location. It can also include name and mimeType.
    • DataPart: Holds structured JSON data (data field). Useful for forms, structured results, etc.
    • Each Part can also have optional metadata.

7. Artifact

Represents outputs generated by the agent during task execution, separate from conversational messages. Examples include generated files (code, images, documents) or final structured data results.

  • Structure (Artifact model in types.py):
    • name / description (string, optional)
    • parts (list of Part objects, required): The content of the artifact.
    • metadata (dict, optional)
    • index, append, lastChunk (optional): Used for streaming large artifacts in chunks.

8. Communication Flow (Typical)

  1. Discovery: Client Agent fetches the Server Agent's AgentCard from /.well-known/agent.json. It checks capabilities, authentication requirements, and available skills.
  2. Initiation: Client generates a unique Task ID. It sends an initial Message (role: "user") within a tasks/send (for simple request/response) or tasks/sendSubscribe (for streaming) request to the Server's A2A endpoint URL.
  3. Processing & Updates:
    • Non-Streaming (tasks/send): The Server processes the task synchronously (from the client's perspective, though it might be async internally). It eventually returns a final Task object in the JSON-RPC response, indicating a terminal state (completed, failed).
    • Streaming (tasks/sendSubscribe): The Server immediately acknowledges the request (often with an initial TaskStatusUpdateEvent indicating submitted or working). It then uses Server-Sent Events (SSE) over the same HTTP connection to push updates to the client as the task progresses. These updates can be:
      • TaskStatusUpdateEvent: Notifies changes in the task's state (e.g., working -> input-required, working -> completed).
      • TaskArtifactUpdateEvent: Sends generated Artifact data, potentially chunked.
  4. Interaction (if input-required): If the Server Agent sends a TaskStatusUpdateEvent with state input-required (optionally including a Message asking a question), the Client sends a subsequent Message (role: "user") using the same Task ID via another tasks/send or tasks/sendSubscribe request to provide the needed input.
  5. Completion: The task eventually reaches a terminal state (completed, failed, canceled), indicated in the final Task object (for tasks/send) or the final: true flag in a TaskStatusUpdateEvent (for tasks/sendSubscribe).

9. JSON-RPC Methods

A2A defines standard JSON-RPC 2.0 methods for task management:

  • tasks/send: Initiates or continues a task, expects a single Task response upon completion.
  • tasks/sendSubscribe: Initiates or continues a task, expects a stream of SSE updates (TaskStatusUpdateEvent, TaskArtifactUpdateEvent).
  • tasks/get: Retrieves the current state and history of a specific task by its ID.
  • tasks/cancel: Requests cancellation of an ongoing task. Success is not guaranteed (agent might be too far along or not support cancellation).
  • tasks/pushNotification/set: (If supported) Client provides a webhook URL for the server to push task updates to, instead of relying on SSE or polling tasks/get.
  • tasks/pushNotification/get: Retrieves the currently configured push notification settings for a task.
  • tasks/resubscribe: (If supported) Allows a client to reconnect to the SSE stream for an existing task (e.g., after a network interruption).

(Self-correction: Now transition to the practical examples and tutorials, referencing the sample code structures identified earlier.)

Getting Started: Tutorials and Code Examples

Let's make this concrete. We'll use the Python samples provided in the A2A-main/samples/python/ directory as a basis for our tutorials. These samples provide common client (common/client/) and server (common/server/) components, along with specific agent implementations (agents/) and host applications (hosts/).

Prerequisites:

  • Python 3.10+ (as indicated by sample setups)
  • Familiarity with basic Python, async/await, and HTTP concepts.
  • Access to the A2A-main codebase. (You can likely clone it from its source if it's public).
  • An environment manager like venv or conda.
  • Installation tool like pip or uv (the samples use uv).

Setting up the Environment (Example using venv and pip):

# Navigate to the python samples directory
cd A2A-main/samples/python

# Create a virtual environment
python -m venv .venv

# Activate the environment
# On macOS/Linux:
source .venv/bin/activate
# On Windows:
# .venv\\Scripts\\activate

# Install dependencies (assuming a requirements.txt exists or from pyproject.toml)
# If using pyproject.toml (like the samples):
pip install . # Installs dependencies defined in pyproject.toml

# You might need specific API keys (e.g., Google API Key)
# Set them as environment variables or using a .env file
# export GOOGLE_API_KEY="YOUR_API_KEY_HERE"

Tutorial 1: Building a Simple A2A Server (Echo Agent)

We'll create a basic agent that simply echoes back the user's message.

  1. Define the Agent Card: Create a Python script (e.g., echo_server.py) and define the AgentCard.

    # echo_server.py
    import os
    import uvicorn
    import asyncio
    import logging
    from uuid import uuid4
    
    # Assuming common types and server are importable
    # (adjust imports based on your project structure)
    from common.types import (
        AgentCard, AgentCapabilities, AgentSkill, Task, TaskState, TaskStatus,
        Message, TextPart, SendTaskRequest, SendTaskResponse,
        JSONRPCResponse, JSONRPCError, InternalError
    )
    from common.server import A2AServer, TaskManager
    
    # Configure basic logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    # --- Agent Definition ---
    ECHO_AGENT_CARD = AgentCard(
        name="Echo Agent",
        description="A simple A2A agent that echoes back user messages.",
        url="http://localhost:8001/a2a", # Where this server will run
        version="0.1.0",
        capabilities=AgentCapabilities(
            streaming=False, # This simple agent won't stream
            pushNotifications=False,
            stateTransitionHistory=False
        ),
        authentication=None, # No auth for this simple example
        defaultInputModes=["text"],
        defaultOutputModes=["text"],
        skills=[
            AgentSkill(
                id="echo",
                name="Echo Message",
                description="Receives a text message and sends it back.",
                inputModes=["text"],
                outputModes=["text"],
                examples=["'Hello there!' -> 'Hello there!'"]
            )
        ]
    )
    
    # --- Task Management Logic ---
    class EchoTaskManager(TaskManager):
        def __init__(self):
            # Simple in-memory store for tasks
            self.tasks: dict[str, Task] = {}
            self.lock = asyncio.Lock()
    
        async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
            task_params = request.params
            task_id = task_params.id
            user_message = task_params.message
    
            logger.info(f"Received task {task_id} with message: {user_message.parts}")
    
            # Basic validation: Expecting a single TextPart
            if not user_message.parts or not isinstance(user_message.parts[0], TextPart):
                logger.error(f"Task {task_id}: Invalid input - expected TextPart.")
                error = JSONRPCError(code=-32602, message="Invalid input: Expected a single TextPart.")
                return SendTaskResponse(id=request.id, error=error)
    
            user_text = user_message.parts[0].text
    
            # Create the agent's response message
            agent_response_message = Message(
                role="agent",
                parts=[TextPart(text=f"You said: {user_text}")]
            )
    
            # Create the final Task object
            final_task_status = TaskStatus(
                state=TaskState.COMPLETED,
                message=agent_response_message # Include final message in status
            )
            completed_task = Task(
                id=task_id,
                sessionId=task_params.sessionId,
                status=final_task_status,
                artifacts=[], # No artifacts for echo
                history=[user_message, agent_response_message] # Simple history
            )
    
            # Store the completed task (optional for this simple case)
            async with self.lock:
                self.tasks[task_id] = completed_task
    
            logger.info(f"Task {task_id} completed.")
            # Return the completed task in the response
            return SendTaskResponse(id=request.id, result=completed_task)
    
        # --- Implement other required abstract methods (can raise NotImplemented) ---
        async def on_get_task(self, request):
            # Basic implementation for demonstration
            async with self.lock:
                task = self.tasks.get(request.params.id)
            if task:
                return JSONRPCResponse(id=request.id, result=task)
            else:
                error = JSONRPCError(code=-32001, message="Task not found")
                return JSONRPCResponse(id=request.id, error=error)
    
        async def on_cancel_task(self, request):
            error = JSONRPCError(code=-32004, message="Cancel not supported")
            return JSONRPCResponse(id=request.id, error=error)
    
        async def on_send_task_subscribe(self, request):
             error = JSONRPCError(code=-32004, message="Streaming not supported")
             return JSONRPCResponse(id=request.id, error=error)
    
        async def on_set_task_push_notification(self, request):
            error = JSONRPCError(code=-32003, message="Push notifications not supported")
            return JSONRPCResponse(id=request.id, error=error)
    
        async def on_get_task_push_notification(self, request):
            error = JSONRPCError(code=-32003, message="Push notifications not supported")
            return JSONRPCResponse(id=request.id, error=error)
    
        async def on_resubscribe_to_task(self, request):
             error = JSONRPCError(code=-32004, message="Resubscribe not supported")
             return JSONRPCResponse(id=request.id, error=error)
    
    
    # --- Server Setup ---
    if __name__ == "__main__":
        task_manager = EchoTaskManager()
        server = A2AServer(
            host="localhost",
            port=8001,
            endpoint="/a2a", # Matches AgentCard URL path
            agent_card=ECHO_AGENT_CARD,
            task_manager=task_manager
        )
        print("Starting Echo A2A Server on http://localhost:8001")
        # Use server.start() which calls uvicorn.run
        # Note: For production, use a proper ASGI server like uvicorn or hypercorn directly
        server.start()
        # Alternatively, run directly with uvicorn:
        # uvicorn.run(server.app, host="localhost", port=8001)
    
  2. Run the Server:

    python echo_server.py
    

    You should see output indicating the server has started on http://localhost:8001.

  3. Test with curl (or create a client): You can send a JSON-RPC request using curl.

    curl -X POST http://localhost:8001/a2a \
         -H "Content-Type: application/json" \
         -d '{
              "jsonrpc": "2.0",
              "method": "tasks/send",
              "id": "my-echo-task-123",
              "params": {
                "id": "my-echo-task-123",
                "sessionId": "session-abc",
                "message": {
                  "role": "user",
                  "parts": [
                    {
                      "type": "text",
                      "text": "Hello A2A World!"
                    }
                  ]
                }
              }
            }'
    
  4. Expected Response: You should receive a JSON-RPC response containing the completed Task object:

    {
      "jsonrpc": "2.0",
      "id": "my-echo-task-123",
      "result": {
        "id": "my-echo-task-123",
        "sessionId": "session-abc",
        "status": {
          "state": "completed",
          "message": {
            "role": "agent",
            "parts": [
              {
                "type": "text",
                "text": "You said: Hello A2A World!"
              }
            ],
            "metadata": null
          },
          "timestamp": "..." // ISO timestamp
        },
        "artifacts": [],
        "history": [
          {
            "role": "user",
            "parts": [
              {
                "type": "text",
                "text": "Hello A2A World!"
              }
            ],
            "metadata": null
          },
          {
            "role": "agent",
            "parts": [
              {
                "type": "text",
                "text": "You said: Hello A2A World!"
              }
            ],
            "metadata": null
          }
        ],
        "metadata": null
      },
      "error": null
    }
    

This simple example demonstrates the basic request/response flow using tasks/send.

Tutorial 2: Building an A2A Client (Python)

Now, let's create a client to interact with our Echo Server.

# echo_client.py
import asyncio
import logging
from uuid import uuid4

# Assuming common types and client are importable
from common.client import A2AClient, card_resolver # card_resolver might be needed
from common.types import Message, TextPart, AgentCard # Import AgentCard if needed directly

# Configure basic logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

ECHO_SERVER_URL = "http://localhost:8001/a2a" # URL from Echo Server's AgentCard

async def main():
    # In a real scenario, you might fetch the AgentCard first
    # try:
    #   agent_card = await card_resolver.fetch_agent_card(ECHO_SERVER_URL)
    #   client = A2AClient(agent_card=agent_card)
    # except Exception as e:
    #   logger.error(f"Failed to fetch AgentCard or initialize client: {e}")
    #   return

    # For simplicity, we'll use the URL directly
    client = A2AClient(url=ECHO_SERVER_URL)

    task_id = f"echo-task-{uuid4().hex}"
    session_id = f"session-{uuid4().hex}"
    user_text = "Testing the echo client!"

    # Construct the user message
    user_message = Message(
        role="user",
        parts=[TextPart(text=user_text)]
    )

    # Prepare the parameters for tasks/send
    send_params = {
        "id": task_id,
        "sessionId": session_id,
        "message": user_message,
        # Optional: acceptedOutputModes, pushNotification, historyLength, metadata
    }

    try:
        logger.info(f"Sending task {task_id} to {ECHO_SERVER_URL}...")
        # Use the client's send_task method
        # It handles constructing the JSONRPCRequest internally
        response = await client.send_task(payload=send_params)

        if response.error:
            logger.error(f"Task {task_id} failed: {response.error.message} (Code: {response.error.code})")
        elif response.result:
            task_result = response.result
            logger.info(f"Task {task_id} completed with state: {task_result.status.state}")
            if task_result.status.message and task_result.status.message.parts:
                 agent_part = task_result.status.message.parts[0]
                 if isinstance(agent_part, TextPart):
                     logger.info(f"Agent response: {agent_part.text}")
                 else:
                     logger.warning("Agent response was not TextPart")
            else:
                 logger.warning("No message part in agent response status")
        else:
            logger.error(f"Received unexpected response for task {task_id}: {response}")

    except Exception as e:
        logger.error(f"An error occurred while communicating with the agent: {e}")

if __name__ == "__main__":
    asyncio.run(main())

Run the Client:

Ensure the echo_server.py is running first, then run the client:

python echo_client.py

You should see log output showing the task being sent and the echoed response received from the server.

Tutorial 3: Handling Streaming (tasks/sendSubscribe)

Let's modify the server and client to use streaming via SSE. Imagine an agent that provides status updates as it "works".

Server Modifications (streaming_echo_server.py):

# streaming_echo_server.py (Modifications based on echo_server.py)
# ... (Imports similar to echo_server.py, add AsyncIterable)
import time
from typing import AsyncIterable
from common.types import (
    # ... other imports ...
    TaskState, TaskStatus, TaskStatusUpdateEvent, SendTaskStreamingRequest,
    SendTaskStreamingResponse
)
from common.server import A2AServer, TaskManager

# --- Agent Definition (Update Capabilities) ---
STREAMING_ECHO_AGENT_CARD = AgentCard(
    # ... (name, description, url, version, etc. as before) ...
    url="http://localhost:8002/a2a", # Use a different port
    capabilities=AgentCapabilities(
        streaming=True, # <<< Enable streaming capability
        pushNotifications=False,
        stateTransitionHistory=True # Let's include history
    ),
    # ... (rest of the card) ...
)


# --- Task Management Logic (Implement on_send_task_subscribe) ---
class StreamingEchoTaskManager(TaskManager):
    def __init__(self):
        self.tasks: dict[str, Task] = {} # Store task state
        self.lock = asyncio.Lock()
        # NOTE: The common InMemoryTaskManager in samples handles SSE queueing
        #       If NOT using that base class, you'd need SSE queue management here.
        #       For this example, we'll simulate the async generator directly.

    async def _do_work(self, task_id: str, user_text: str) -> AsyncIterable[SendTaskStreamingResponse]:
        logger.info(f"Task {task_id}: Starting work...")

        # 1. Send 'working' status update
        working_status = TaskStatus(state=TaskState.WORKING)
        yield SendTaskStreamingResponse(
            id=None, # SSE events don't need request ID
            result=TaskStatusUpdateEvent(id=task_id, status=working_status)
        )

        await asyncio.sleep(1) # Simulate work

        # 2. Simulate some progress (optional)
        progress_status = TaskStatus(state=TaskState.WORKING, message=Message(role="agent", parts=[TextPart(text="Thinking...")]))
        yield SendTaskStreamingResponse(
            id=None,
            result=TaskStatusUpdateEvent(id=task_id, status=progress_status)
        )

        await asyncio.sleep(2) # Simulate more work

        # 3. Send 'completed' status update with final message
        agent_response_message = Message(
            role="agent",
            parts=[TextPart(text=f"You said (streamed): {user_text}")]
        )
        completed_status = TaskStatus(
            state=TaskState.COMPLETED,
            message=agent_response_message
        )
        yield SendTaskStreamingResponse(
            id=None,
            result=TaskStatusUpdateEvent(id=task_id, status=completed_status, final=True) # Mark as final
        )

        logger.info(f"Task {task_id}: Work completed.")


    async def on_send_task_subscribe(
        self, request: SendTaskStreamingRequest
    ) -> AsyncIterable[SendTaskStreamingResponse]:
        task_params = request.params
        task_id = task_params.id
        user_message = task_params.message
        logger.info(f"Received streaming task {task_id}")

        # Validate input
        if not user_message.parts or not isinstance(user_message.parts[0], TextPart):
             logger.error(f"Task {task_id}: Invalid input.")
             error = JSONRPCError(code=-32602, message="Invalid input")
             # Cannot yield an error directly in SSE stream start easily with this structure
             # A real server might handle this differently (e.g., immediate error response
             # before starting stream, or an error event in the stream).
             # For simplicity, log and return empty stream or raise
             raise ValueError("Invalid input for streaming task") # Or return empty async iter

        user_text = user_message.parts[0].text

        # Store initial task state (optional but good practice)
        initial_task = Task(
            id=task_id,
            sessionId=task_params.sessionId,
            status=TaskStatus(state=TaskState.SUBMITTED), # Or WORKING immediately
            history=[user_message]
        )
        async with self.lock:
            self.tasks[task_id] = initial_task

        # Return the async generator that yields SSE events
        return self._do_work(task_id, user_text)

    # --- Implement other methods (on_send_task, on_get_task etc.) ---
    # ... (Implementations similar to EchoTaskManager, adapting for streaming context
    #      or raising NotImplemented if only streaming is supported) ...
    async def on_send_task(self, request):
         error = JSONRPCError(code=-32004, message="Use tasks/sendSubscribe for this agent")
         return SendTaskResponse(id=request.id, error=error)
    # ... other methods ...


# --- Server Setup ---
if __name__ == "__main__":
    task_manager = StreamingEchoTaskManager()
    server = A2AServer(
        host="localhost",
        port=8002, # Different port
        endpoint="/a2a",
        agent_card=STREAMING_ECHO_AGENT_CARD,
        task_manager=task_manager
    )
    print("Starting Streaming Echo A2A Server on http://localhost:8002")
    server.start()

Client Modifications (streaming_echo_client.py):

# streaming_echo_client.py (Modifications based on echo_client.py)
import asyncio
import logging
from uuid import uuid4

from common.client import A2AClient
from common.types import Message, TextPart, TaskStatusUpdateEvent, TaskArtifactUpdateEvent

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

STREAMING_ECHO_SERVER_URL = "http://localhost:8002/a2a"

async def main():
    client = A2AClient(url=STREAMING_ECHO_SERVER_URL)

    task_id = f"stream-task-{uuid4().hex}"
    user_text = "Testing the streaming client!"

    user_message = Message(role="user", parts=[TextPart(text=user_text)])

    send_params = {
        "id": task_id,
        "message": user_message,
    }

    try:
        logger.info(f"Sending streaming task {task_id} to {STREAMING_ECHO_SERVER_URL}...")

        # Use the client's send_task_streaming method
        async for response in client.send_task_streaming(payload=send_params):
            if response.error:
                # Errors might be sent as part of the stream in some implementations
                logger.error(f"Received error in stream for task {task_id}: {response.error.message}")
                break # Stop processing stream on error

            elif response.result:
                event = response.result
                if isinstance(event, TaskStatusUpdateEvent):
                    logger.info(f"Task {task_id} Status Update: {event.status.state}")
                    if event.status.message and event.status.message.parts:
                         part = event.status.message.parts[0]
                         if isinstance(part, TextPart):
                             logger.info(f"  Agent Message: {part.text}")
                    if event.final:
                        logger.info(f"Task {task_id} reached final state.")
                        break # Exit loop once task is final

                elif isinstance(event, TaskArtifactUpdateEvent):
                     logger.info(f"Task {task_id} Artifact Update: {event.artifact.name}")
                     # Process artifact parts...
                else:
                     logger.warning(f"Received unknown event type in stream: {type(event)}")
            else:
                logger.error(f"Received unexpected empty response in stream for task {task_id}")


    except Exception as e:
        logger.error(f"An error occurred during streaming communication: {e}")

if __name__ == "__main__":
    asyncio.run(main())

Run: Start streaming_echo_server.py, then run streaming_echo_client.py. You'll see the client log status updates (WORKING, COMPLETED) and the final echoed message as they are streamed from the server.

(Self-correction: The article is getting long. Need to cover parts, artifacts, advanced topics, and conclusion concisely but thoroughly to reach the target word count and scope.)

Tutorial 4: Working with Different Data Parts (FilePart, DataPart)

A2A's strength lies in handling diverse data.

  • Sending Files (FilePart):

    • Client: When creating the Message or Artifact, add a FilePart.
      • For small files: Read content, base64 encode it, and put it in FilePart(file=FileContent(bytes=encoded_content, name="...", mimeType="...")).
      • For large files/external references: Use FilePart(file=FileContent(uri="file:///path/to/local/file", name="...", mimeType="...")) or FilePart(file=FileContent(uri="https://example.com/remote/file", name="...", mimeType="...")). The server needs to be able to resolve and access the URI.
    • Server: In the task handler, check message.parts or artifact.parts. If a FilePart is found, decode the bytes or fetch the content from the uri.
  • Sending Structured Data (DataPart):

    • Client: Create a dictionary with the structured data. Add DataPart(data=my_dict) to the Message or Artifact.
    • Server: Check parts for DataPart. Access the dictionary via part.data. Useful for sending JSON payloads, form data, or configuration.

Example Snippet (Client sending different parts):

# client_sending_parts.py (Conceptual)
from common.types import Message, TextPart, FilePart, DataPart, FileContent
import base64

# ... client setup ...

# 1. Text
text_part = TextPart(text="Analyze the attached data and file.")

# 2. Structured Data
json_data = {"user_id": 123, "preferences": {"theme": "dark", "notifications": True}}
data_part = DataPart(data=json_data, metadata={"source": "user_profile"})

# 3. File (Inline - small file)
try:
    with open("report.txt", "rb") as f:
        file_bytes = f.read()
        encoded_bytes = base64.b64encode(file_bytes).decode('utf-8')
    file_part_inline = FilePart(
        file=FileContent(bytes=encoded_bytes, name="report.txt", mimeType="text/plain")
    )
except Exception as e:
    logger.error(f"Failed to read inline file: {e}")
    file_part_inline = None

# 4. File (URI - assumes server can access)
file_part_uri = FilePart(
    file=FileContent(uri="file:///shared/data/large_dataset.csv", name="large_dataset.csv", mimeType="text/csv")
)


parts_list = [text_part, data_part]
if file_part_inline:
    parts_list.append(file_part_inline)
parts_list.append(file_part_uri)

user_message = Message(role="user", parts=parts_list)

# ... send message using client.send_task(...) ...

Advanced Topics & Use Cases

  • Agent Discovery: The /.well-known/agent.json endpoint is the standard mechanism. Clients should fetch this first. Implementations might include caching strategies. The card_resolver.py in the Python samples hints at this.
  • Authentication: The AgentCard declares supported schemes. The client must choose a compatible scheme and include the necessary credentials (e.g., Bearer token in Authorization header, API key in header/query param) when making HTTP requests to the A2A url. The A2A protocol itself doesn't dictate how auth is implemented, only how it's advertised. Secure handling of credentials is vital.
  • Error Handling: A2A uses standard JSON-RPC 2.0 error codes (-32700 Parse Error, -32600 Invalid Request, -32601 Method Not Found, -32602 Invalid Params, -32603 Internal Error) plus custom codes for A2A-specific issues (e.g., -32001 Task Not Found, -32002 Task Not Cancelable). Servers should return appropriate error objects, and clients must handle them gracefully. The sample A2AServer shows basic error mapping.
  • Push Notifications: For scenarios where SSE is unsuitable (e.g., client behind restrictive firewall, very long-lived tasks where keeping connection open is costly), push notifications offer an alternative.
    1. Client calls tasks/pushNotification/set, providing its own webhook url, an optional token for verification, and potentially authentication details needed for the server to call the client's webhook.
    2. Server stores this configuration for the task.
    3. When a task update occurs, the Server makes an HTTP POST request to the client's webhook url, sending the TaskStatusUpdateEvent or TaskArtifactUpdateEvent in the request body (potentially verifying using the provided token).
    4. Client's webhook endpoint receives the update.
  • Integration with Frameworks: The true power of A2A shines when bridging different agent frameworks. The A2A-main/samples/ directory shows examples for:
    • Google Agent Development Kit (ADK)
    • CrewAI
    • LangGraph
    • Genkit (JavaScript) These samples typically involve wrapping the framework's agent logic within an A2A Server implementation (like the TaskManager) and using the A2AClient to call other A2A agents.
  • Enterprise Readiness: A2A is designed with enterprise needs in mind, supporting security declarations, asynchronous operations suitable for complex workflows, and clear interfaces for monitoring and management (via tasks/get).

The Future of A2A

The A2A protocol is actively evolving. Future enhancements outlined in the README.md include:

  • Improved Agent Discovery: Formalizing how auth schemes and credentials (optional) are included in the AgentCard.
  • Enhanced Collaboration: Investigating methods like QuerySkill() for dynamic capability checking.
  • Richer Task Lifecycle/UX: Supporting dynamic negotiation of interaction modes (e.g., adding audio mid-task).
  • Protocol Extensions: Exploring client-initiated methods beyond task management and improving streaming/push reliability.
  • Better Samples & Docs: Continuously improving examples and documentation clarity.

Conclusion: Embrace the Interoperable Future

The Agent2Agent protocol is more than just a technical specification; it's a foundational piece for building the next generation of intelligent, collaborative AI systems. As AI agents become more specialized and diverse, the need for a standard communication protocol becomes paramount. A2A provides a robust, open, and flexible solution to the critical challenge of interoperability.

By enabling agents built on different frameworks and platforms to discover, communicate, and coordinate tasks effectively, A2A unlocks new possibilities for complex workflow automation, sophisticated multi-agent applications, and a more modular, scalable approach to AI development.

The time to learn and adopt A2A is now. Whether you're building individual agents, designing enterprise AI solutions, or contributing to AI frameworks, understanding and leveraging A2A will be crucial for navigating the increasingly interconnected future of artificial intelligence. Explore the specification, experiment with the samples, and consider contributing to this vital open standard.

Community

Your need to confirm your account before you can post a new comment.

Sign up or log in to comment