|
import logging |
|
import gradio as gr |
|
import aiohttp |
|
import asyncio |
|
import threading |
|
import logging |
|
from gradio import ChatInterface |
|
from fastmcp import Client |
|
from typing import Dict |
|
from query_blogger_mcp_server.server import run_mcp_server_in_http_mode |
|
from query_blogger_mcp_server.config import settings |
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
MCP_SERVER_URL = f"http://localhost:{settings.UVICORN_PORT}/mcp" |
|
mcp_client = Client(MCP_SERVER_URL) |
|
|
|
async def ping_mcp_server(): |
|
async with mcp_client: |
|
await mcp_client.ping() |
|
|
|
async def a_get_blog_info_by_url(blog_url: str) -> Dict: |
|
try: |
|
logger.info(f"Fetching blog info for {blog_url}...") |
|
async with mcp_client: |
|
response = await mcp_client.call_tool("get_blog_info_by_url", {"blog_url": blog_url}) |
|
logger.info(f"Received response for {blog_url}: {response}") |
|
return response |
|
except Exception as e: |
|
logger.error(f"Error fetching blog info for {blog_url}: {e}") |
|
return {"error": f"Error fetching blog info for {blog_url}: {e}"} |
|
|
|
def get_blog_info_by_url(blog_url: str) -> Dict: |
|
|
|
return asyncio.run(a_get_blog_info_by_url(blog_url)) |
|
|
|
async def a_get_recent_posts(blog_url: str, num_posts: int, include_content: bool) -> Dict: |
|
try: |
|
logger.info(f"Fetching recent posts for {blog_url}...") |
|
async with mcp_client: |
|
response = await mcp_client.call_tool("get_recent_posts", { |
|
"blog_url": blog_url, |
|
"num_posts": num_posts, |
|
"include_content": include_content |
|
}) |
|
logger.info(f"Received response for {blog_url}: {response}") |
|
return response |
|
except Exception as e: |
|
logger.error(f"Error fetching recent posts for {blog_url}: {e}") |
|
return {"error": f"Error fetching recent posts for {blog_url}: {e}"} |
|
|
|
def get_recent_posts(blog_url: str, num_posts: int, include_content: bool) -> Dict: |
|
|
|
return asyncio.run(a_get_recent_posts(blog_url, num_posts, include_content)) |
|
|
|
async def a_list_recent_posts(blog_url: str, num_posts: int) -> Dict: |
|
try: |
|
logger.info(f"Listing recent posts for {blog_url}...") |
|
async with mcp_client: |
|
response = await mcp_client.call_tool("list_recent_posts", { |
|
"blog_url": blog_url, |
|
"num_posts": num_posts |
|
}) |
|
logger.info(f"Received response for {blog_url}: {response}") |
|
return response |
|
except Exception as e: |
|
logger.error(f"Error listing recent posts for {blog_url}: {e}") |
|
return {"error": f"Error listing recent posts for {blog_url}: {e}"} |
|
|
|
def list_recent_posts(blog_url: str, num_posts: int) -> Dict: |
|
|
|
return asyncio.run(a_list_recent_posts(blog_url, num_posts)) |
|
|
|
async def a_search_posts(blog_url: str, query_terms: str, num_posts: int) -> Dict: |
|
try: |
|
logger.info(f"Searching posts for {blog_url} with query '{query_terms}'...") |
|
async with mcp_client: |
|
response = await mcp_client.call_tool("search_posts", { |
|
"blog_url": blog_url, |
|
"query_terms": query_terms, |
|
"num_posts": num_posts |
|
}) |
|
logger.info(f"Received response for {blog_url}: {response}") |
|
return response |
|
except Exception as e: |
|
logger.error(f"Error searching posts for {blog_url}: {e}") |
|
return {"error": f"Error searching posts for {blog_url}: {e}"} |
|
|
|
def search_posts(blog_url: str, query_terms: str, num_posts: int) -> Dict: |
|
|
|
return asyncio.run(a_search_posts(blog_url, query_terms, num_posts)) |
|
|
|
|
|
def build_gradio_interface(): |
|
"""Create an interface that connects to your MCP server""" |
|
|
|
with gr.Blocks(title="Query Blogger MCP Server") as ui: |
|
gr.Markdown("# Query Blogger MCP Server") |
|
|
|
with gr.Accordion("About Blog", open=True): |
|
gr.Markdown("## Get Blog Info by URL") |
|
with gr.Column(scale=2): |
|
in_blog_url = gr.Textbox(lines=1, label="Blog URL", value="https://blog.codonomics.com") |
|
out_blog_info = gr.JSON(label="Output") |
|
btn_get_blog_info = gr.Button("Get Blog Info") |
|
btn_get_blog_info.click( |
|
fn=get_blog_info_by_url, |
|
inputs=[in_blog_url], |
|
outputs=[out_blog_info] |
|
) |
|
|
|
with gr.Accordion("Recent Posts", open=False): |
|
gr.Markdown("## Get recent posts") |
|
with gr.Column(scale=2): |
|
in_blog_url_2 = gr.Textbox(lines=1, label="Blog URL", value="https://blog.codonomics.com") |
|
in_num_posts = gr.Number(label="Number of posts", value=3, minimum=1, maximum=5, step=1, interactive=True, key="num_posts") |
|
in_include_content = gr.Checkbox(label="Include content", value=True, interactive=True, key="include_content") |
|
out_recent_posts = gr.JSON(label="Output") |
|
btn_get_recent_posts = gr.Button("Get recent posts") |
|
btn_get_recent_posts.click( |
|
fn=get_recent_posts, |
|
inputs=[in_blog_url_2, in_num_posts, in_include_content], |
|
outputs=[out_recent_posts] |
|
) |
|
|
|
gr.Markdown("## List recent posts") |
|
with gr.Column(scale=2): |
|
in_blog_url_3 = gr.Textbox(lines=1, label="Blog URL", value="https://blog.codonomics.com") |
|
in_num_posts_3 = gr.Number(label="Number of posts", value=3, minimum=1, maximum=10, step=1, interactive=True, key="num_posts") |
|
out_recent_posts_3 = gr.JSON(label="Output") |
|
btn_get_recent_posts_3 = gr.Button("List recent posts") |
|
btn_get_recent_posts_3.click( |
|
fn=list_recent_posts, |
|
inputs=[in_blog_url_3, in_num_posts_3], |
|
outputs=[out_recent_posts_3] |
|
) |
|
|
|
with gr.Accordion("Search Posts", open=False): |
|
gr.Markdown("## Search Posts") |
|
with gr.Column(scale=2): |
|
in_blog_url_4 = gr.Textbox(lines=1, label="Blog URL", value="https://blog.codonomics.com") |
|
in_query_terms = gr.Textbox(lines=1, label="Query Terms", value="AI") |
|
in_num_posts_4 = gr.Number(label="Number of posts", value=3, minimum=1, maximum=10, step=1, interactive=True, key="num_posts") |
|
out_search_results = gr.JSON(label="Output") |
|
btn_search_posts = gr.Button("Search Posts") |
|
btn_search_posts.click( |
|
fn=search_posts, |
|
inputs=[in_blog_url_4, in_query_terms, in_num_posts_4], |
|
outputs=[out_search_results] |
|
) |
|
|
|
return ui |
|
|
|
def run_gradio(): |
|
"""Run Gradio in a separate thread (no asyncio conflict).""" |
|
logger.info("Starting Gradio interface...") |
|
demo = build_gradio_interface() |
|
demo.launch( |
|
debug=True, |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
share=False |
|
) |
|
|
|
def start_mcp_server_async(): |
|
"""Start the MCP server in a separate thread with its own event loop.""" |
|
logger.info("Starting MCP server in async mode...") |
|
asyncio.run(run_mcp_server_in_http_mode()) |
|
|
|
async def wait_for_mcp_server(host, port, max_retries=30): |
|
"""Wait for MCP server to be ready.""" |
|
for i in range(max_retries): |
|
try: |
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(f"http://{host}:{port}/health", timeout=5) as response: |
|
if response.status == 200: |
|
logger.info("✅ MCP Server is ready!") |
|
return True |
|
except Exception: |
|
pass |
|
|
|
logger.info(f"Waiting for MCP server... ({i+1}/{max_retries})") |
|
await asyncio.sleep(1) |
|
|
|
logger.warning("❌ MCP Server not ready after waiting") |
|
return False |
|
|
|
async def main(): |
|
"""Main function to start both servers.""" |
|
|
|
mcp_thread = threading.Thread(target=start_mcp_server_async, daemon=True) |
|
mcp_thread.start() |
|
|
|
|
|
logger.info("Waiting for MCP server to start...") |
|
await wait_for_mcp_server(settings.UVICORN_HOST, settings.UVICORN_PORT) |
|
await ping_mcp_server() |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
asyncio.run(main()) |
|
|
|
|
|
run_gradio() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|