from rich.markdown import Markdown from rich.console import Console from rich.live import Live from dotenv import load_dotenv import asyncio import pathlib import sys import os from pydantic_ai import Agent from openai import AsyncOpenAI, OpenAI from pydantic_ai.models.openai import OpenAIModel import mcp_client # Get the directory where the current script is located SCRIPT_DIR = pathlib.Path(__file__).parent.resolve() # Define the path to the config file relative to the script directory CONFIG_FILE = SCRIPT_DIR / "mcp_config.json" load_dotenv() def get_model(): llm = os.getenv('MODEL_CHOICE', 'gpt-4o-mini') base_url = os.getenv('BASE_URL', 'https://api.openai.com/v1') api_key = os.getenv('LLM_API_KEY', 'no-api-key-provided') return OpenAIModel( llm, base_url=base_url, api_key=api_key ) async def get_pydantic_ai_agent(): client = mcp_client.MCPClient() client.load_servers(str(CONFIG_FILE)) tools = await client.start() return client, Agent(model=get_model(), tools=tools) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # ~~~~~~~~~~~~~~~~~~~~ Main Function with CLI Chat ~~~~~~~~~~~~~~~~~~~~~ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ async def main(): print("=== Pydantic AI MCP CLI Chat ===") print("Type 'exit' to quit the chat") # Initialize the agent and message history mcp_client, mcp_agent = await get_pydantic_ai_agent() console = Console() messages = [] try: while True: # Get user input user_input = input("\n[You] ") # Check if user wants to exit if user_input.lower() in ['exit', 'quit', 'bye', 'goodbye']: print("Goodbye!") break try: # Process the user input and output the response print("\n[Assistant]") with Live('', console=console, vertical_overflow='visible') as live: async with mcp_agent.run_stream( user_input, message_history=messages ) as result: curr_message = "" async for message in result.stream_text(delta=True): curr_message += message live.update(Markdown(curr_message)) # Add the new messages to the chat history messages.extend(result.all_messages()) except Exception as e: print(f"\n[Error] An error occurred: {str(e)}") finally: # Ensure proper cleanup of MCP client resources when exiting await mcp_client.cleanup() if __name__ == "__main__": asyncio.run(main())