Spaces:
Sleeping
Sleeping
| from a2a.server.agent_execution import AgentExecutor, RequestContext | |
| from a2a.server.events import EventQueue | |
| from a2a.utils import new_agent_text_message | |
| # --8<-- [start:HelloWorldAgent] | |
| class HelloWorldAgent: | |
| """Hello World Agent.""" | |
| async def invoke(self) -> str: | |
| return 'Hello World' | |
| # --8<-- [end:HelloWorldAgent] | |
| # --8<-- [start:HelloWorldAgentExecutor_init] | |
| class HelloWorldAgentExecutor(AgentExecutor): | |
| """Test AgentProxy Implementation.""" | |
| def __init__(self): | |
| self.agent = HelloWorldAgent() | |
| # --8<-- [end:HelloWorldAgentExecutor_init] | |
| # --8<-- [start:HelloWorldAgentExecutor_execute] | |
| async def execute( | |
| self, | |
| context: RequestContext, | |
| event_queue: EventQueue, | |
| ) -> None: | |
| result = await self.agent.invoke() | |
| await event_queue.enqueue_event(new_agent_text_message(result)) | |
| # --8<-- [end:HelloWorldAgentExecutor_execute] | |
| # --8<-- [start:HelloWorldAgentExecutor_cancel] | |
| async def cancel( | |
| self, context: RequestContext, event_queue: EventQueue | |
| ) -> None: | |
| raise Exception('cancel not supported') | |
| # --8<-- [end:HelloWorldAgentExecutor_cancel] | |