a2a-server / original /agent_executor.py
Sheyda Kiani Mehr
revert to test version
d98558a
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.utils import new_agent_text_message
# --8<-- [start:HelloWorldAgent]
class HelloWorldAgent:
"""Hello World Agent."""
async def invoke(self) -> str:
return 'Hello World'
# --8<-- [end:HelloWorldAgent]
# --8<-- [start:HelloWorldAgentExecutor_init]
class HelloWorldAgentExecutor(AgentExecutor):
"""Test AgentProxy Implementation."""
def __init__(self):
self.agent = HelloWorldAgent()
# --8<-- [end:HelloWorldAgentExecutor_init]
# --8<-- [start:HelloWorldAgentExecutor_execute]
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
result = await self.agent.invoke()
await event_queue.enqueue_event(new_agent_text_message(result))
# --8<-- [end:HelloWorldAgentExecutor_execute]
# --8<-- [start:HelloWorldAgentExecutor_cancel]
async def cancel(
self, context: RequestContext, event_queue: EventQueue
) -> None:
raise Exception('cancel not supported')
# --8<-- [end:HelloWorldAgentExecutor_cancel]