import logging import gradio as gr import aiohttp import asyncio import threading import logging from gradio import ChatInterface from fastmcp import Client from typing import Dict from query_blogger_mcp_server.server import run_mcp_server_in_http_mode from query_blogger_mcp_server.config import settings logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize MCP client MCP_SERVER_URL = f"http://localhost:{settings.UVICORN_PORT}/mcp" mcp_client = Client(MCP_SERVER_URL) async def ping_mcp_server(): async with mcp_client: await mcp_client.ping() async def a_get_blog_info_by_url(blog_url: str) -> Dict: try: logger.info(f"Fetching blog info for {blog_url}...") async with mcp_client: response = await mcp_client.call_tool("get_blog_info_by_url", {"blog_url": blog_url}) logger.info(f"Received response for {blog_url}: {response}") return response except Exception as e: logger.error(f"Error fetching blog info for {blog_url}: {e}") return {"error": f"Error fetching blog info for {blog_url}: {e}"} def get_blog_info_by_url(blog_url: str) -> Dict: # Wrap Async Function with asyncio.run return asyncio.run(a_get_blog_info_by_url(blog_url)) async def a_get_recent_posts(blog_url: str, num_posts: int, include_content: bool) -> Dict: try: logger.info(f"Fetching recent posts for {blog_url}...") async with mcp_client: response = await mcp_client.call_tool("get_recent_posts", { "blog_url": blog_url, "num_posts": num_posts, "include_content": include_content }) logger.info(f"Received response for {blog_url}: {response}") return response except Exception as e: logger.error(f"Error fetching recent posts for {blog_url}: {e}") return {"error": f"Error fetching recent posts for {blog_url}: {e}"} def get_recent_posts(blog_url: str, num_posts: int, include_content: bool) -> Dict: # Wrap Async Function with asyncio.run return asyncio.run(a_get_recent_posts(blog_url, num_posts, include_content)) async def a_list_recent_posts(blog_url: str, num_posts: int) -> Dict: try: logger.info(f"Listing recent posts for {blog_url}...") async with mcp_client: response = await mcp_client.call_tool("list_recent_posts", { "blog_url": blog_url, "num_posts": num_posts }) logger.info(f"Received response for {blog_url}: {response}") return response except Exception as e: logger.error(f"Error listing recent posts for {blog_url}: {e}") return {"error": f"Error listing recent posts for {blog_url}: {e}"} def list_recent_posts(blog_url: str, num_posts: int) -> Dict: # Wrap Async Function with asyncio.run return asyncio.run(a_list_recent_posts(blog_url, num_posts)) async def a_search_posts(blog_url: str, query_terms: str, num_posts: int) -> Dict: try: logger.info(f"Searching posts for {blog_url} with query '{query_terms}'...") async with mcp_client: response = await mcp_client.call_tool("search_posts", { "blog_url": blog_url, "query_terms": query_terms, "num_posts": num_posts }) logger.info(f"Received response for {blog_url}: {response}") return response except Exception as e: logger.error(f"Error searching posts for {blog_url}: {e}") return {"error": f"Error searching posts for {blog_url}: {e}"} def search_posts(blog_url: str, query_terms: str, num_posts: int) -> Dict: # Wrap Async Function with asyncio.run return asyncio.run(a_search_posts(blog_url, query_terms, num_posts)) # Simple MCP client using gradio's built-in support def build_gradio_interface(): """Create an interface that connects to your MCP server""" with gr.Blocks(title="Query Blogger MCP Server") as ui: gr.Markdown("# Query Blogger MCP Server") with gr.Accordion("About Blog", open=True): gr.Markdown("## Get Blog Info by URL") with gr.Column(scale=2): # Widget 1 in_blog_url = gr.Textbox(lines=1, label="Blog URL", value="https://blog.codonomics.com") out_blog_info = gr.JSON(label="Output") btn_get_blog_info = gr.Button("Get Blog Info") btn_get_blog_info.click( fn=get_blog_info_by_url, inputs=[in_blog_url], outputs=[out_blog_info] ) with gr.Accordion("Recent Posts", open=False): gr.Markdown("## Get recent posts") with gr.Column(scale=2): # Widget 2 in_blog_url_2 = gr.Textbox(lines=1, label="Blog URL", value="https://blog.codonomics.com") in_num_posts = gr.Number(label="Number of posts", value=3, minimum=1, maximum=5, step=1, interactive=True, key="num_posts") in_include_content = gr.Checkbox(label="Include content", value=True, interactive=True, key="include_content") out_recent_posts = gr.JSON(label="Output") btn_get_recent_posts = gr.Button("Get recent posts") btn_get_recent_posts.click( fn=get_recent_posts, inputs=[in_blog_url_2, in_num_posts, in_include_content], outputs=[out_recent_posts] ) gr.Markdown("## List recent posts") with gr.Column(scale=2): # Widget 3 in_blog_url_3 = gr.Textbox(lines=1, label="Blog URL", value="https://blog.codonomics.com") in_num_posts_3 = gr.Number(label="Number of posts", value=3, minimum=1, maximum=10, step=1, interactive=True, key="num_posts") out_recent_posts_3 = gr.JSON(label="Output") btn_get_recent_posts_3 = gr.Button("List recent posts") btn_get_recent_posts_3.click( fn=list_recent_posts, inputs=[in_blog_url_3, in_num_posts_3], outputs=[out_recent_posts_3] ) with gr.Accordion("Search Posts", open=False): gr.Markdown("## Search Posts") with gr.Column(scale=2): # Widget 4 in_blog_url_4 = gr.Textbox(lines=1, label="Blog URL", value="https://blog.codonomics.com") in_query_terms = gr.Textbox(lines=1, label="Query Terms", value="AI") in_num_posts_4 = gr.Number(label="Number of posts", value=3, minimum=1, maximum=10, step=1, interactive=True, key="num_posts") out_search_results = gr.JSON(label="Output") btn_search_posts = gr.Button("Search Posts") btn_search_posts.click( fn=search_posts, inputs=[in_blog_url_4, in_query_terms, in_num_posts_4], outputs=[out_search_results] ) return ui def run_gradio(): """Run Gradio in a separate thread (no asyncio conflict).""" logger.info("Starting Gradio interface...") demo = build_gradio_interface() demo.launch( debug=True, server_name="0.0.0.0", server_port=7860, share=False ) def start_mcp_server_async(): """Start the MCP server in a separate thread with its own event loop.""" logger.info("Starting MCP server in async mode...") asyncio.run(run_mcp_server_in_http_mode()) async def wait_for_mcp_server(host, port, max_retries=30): """Wait for MCP server to be ready.""" for i in range(max_retries): try: async with aiohttp.ClientSession() as session: async with session.get(f"http://{host}:{port}/health", timeout=5) as response: if response.status == 200: logger.info("✅ MCP Server is ready!") return True except Exception: pass logger.info(f"Waiting for MCP server... ({i+1}/{max_retries})") await asyncio.sleep(1) logger.warning("❌ MCP Server not ready after waiting") return False async def main(): """Main function to start both servers.""" # Async start MCP server in background thread with its own event loop mcp_thread = threading.Thread(target=start_mcp_server_async, daemon=True) mcp_thread.start() # Wait for MCP server to be ready logger.info("Waiting for MCP server to start...") await wait_for_mcp_server(settings.UVICORN_HOST, settings.UVICORN_PORT) await ping_mcp_server() # Create and launch the interface if __name__ == "__main__": asyncio.run(main()) # Start Gradio interface in main thread (no asyncio conflict) run_gradio() # demo = build_gradio_interface() # demo.launch( # debug=True, # server_name="0.0.0.0", # server_port=7860, # share=False # )