Spaces:
Running
Running
# -*- coding: utf-8 -*- | |
""" | |
Gradio Chat Interface for MCP (Meta Calling Protocol) Client | |
using Hugging Face Inference API for the Language Model. | |
""" | |
import asyncio | |
import os | |
import json | |
from typing import List, Dict, Any, Union, Optional, Tuple | |
from contextlib import AsyncExitStack | |
import logging | |
import traceback | |
# Third-party libraries | |
import httpx # For async HTTP requests | |
import gradio as gr | |
from gradio.components.chatbot import ChatMessage # Although type="messages" uses dicts primarily | |
from dotenv import load_dotenv | |
# MCP specific imports | |
from mcp import ClientSession, StdioServerParameters | |
from mcp.client.stdio import stdio_client | |
# --- Configuration --- | |
load_dotenv() # Load environment variables from .env file | |
# Hugging Face API Configuration | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
# Specify the desired Hugging Face model endpoint | |
HF_API_URL = "https://router.huggingface.co/hf-inference/models/Qwen/Qwen3-235B-A22B/v1/chat/completions" | |
MODEL_NAME = "Qwen/Qwen3-235B-A22B" # Model name for payload and display | |
MAX_TOKENS = 1500 # Max tokens for the LLM response | |
HTTP_TIMEOUT = 120 # Increased timeout for potentially slow model responses | |
# Default MCP Server Script Path | |
DEFAULT_SERVER_SCRIPT = "gradio_mcp_server.py" | |
# --- Logging Setup --- | |
logging.basicConfig( | |
level=logging.INFO, # Set to DEBUG for more verbose output | |
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
# --- Async Event Loop --- | |
# Get the current event loop or create a new one if none exists | |
try: | |
loop = asyncio.get_running_loop() | |
except RuntimeError: | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
logger.info("Asyncio event loop initialized.") | |
# --- MCP Client Wrapper Class --- | |
class MCPClientWrapper: | |
""" | |
Manages the connection to the MCP server, interaction with Hugging Face API, | |
and Gradio message processing logic. | |
""" | |
def __init__(self): | |
"""Initializes the wrapper, loading configuration.""" | |
self.session: Optional[ClientSession] = None | |
self.exit_stack: Optional[AsyncExitStack] = None | |
self.tools: List[Dict[str, Any]] = [] | |
self.http_client: Optional[httpx.AsyncClient] = None | |
self.hf_token: Optional[str] = os.getenv("HF_TOKEN") | |
if not self.hf_token: | |
logger.warning("HF_TOKEN environment variable not found. Hugging Face API calls will be disabled.") | |
else: | |
# Log only a part of the token for verification, NEVER the full token. | |
logger.info(f"HF_TOKEN loaded successfully (starts with: {self.hf_token[:4]}...).") | |
async def _connect(self, server_path: str) -> str: | |
"""Establishes connection to the MCP server and initializes HTTP client.""" | |
# Gracefully close existing resources if reconnecting | |
if self.exit_stack: | |
logger.info("Closing existing connection and resources before reconnecting.") | |
await self.exit_stack.aclose() | |
# Explicitly reset state variables | |
self.exit_stack = None | |
self.session = None | |
self.http_client = None | |
self.tools = [] | |
logger.info(f"Attempting to connect to MCP server script: {server_path}") | |
self.exit_stack = AsyncExitStack() | |
try: | |
# Determine server command (python or node) | |
is_python = server_path.lower().endswith('.py') | |
command = "python" if is_python else "node" | |
logger.info(f"Using command '{command}' for server.") | |
# Configure MCP server parameters | |
server_params = StdioServerParameters( | |
command=command, | |
args=[server_path], | |
env={"PYTHONIOENCODING": "utf-8", "PYTHONUNBUFFERED": "1"} | |
) | |
# --- Establish MCP Connection --- | |
logger.info("Initializing stdio transport...") | |
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params)) | |
self.stdio, self.write = stdio_transport | |
logger.info("Stdio transport established.") | |
logger.info("Initializing MCP client session...") | |
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write)) | |
await self.session.initialize() | |
logger.info("MCP session initialized successfully.") | |
# --- Initialize HTTP Client for Hugging Face --- | |
if self.hf_token: | |
logger.info("Initializing HTTP client for Hugging Face API...") | |
self.http_client = await self.exit_stack.enter_async_context( | |
httpx.AsyncClient(timeout=HTTP_TIMEOUT) | |
) | |
logger.info("HTTP client initialized successfully.") | |
else: | |
logger.warning("HTTP client NOT initialized because HF_TOKEN is missing.") | |
self.http_client = None # Ensure it's None | |
# --- List Available MCP Tools --- | |
logger.info("Listing available tools from MCP server...") | |
response = await self.session.list_tools() | |
self.tools = [{ | |
"name": tool.name, | |
"description": tool.description, | |
"input_schema": tool.inputSchema # Keep schema for potential richer prompts | |
} for tool in response.tools] | |
tool_names = [tool["name"] for tool in self.tools] | |
logger.info(f"Available tools retrieved: {tool_names if tool_names else 'None'}") | |
# --- Prepare Connection Status Message --- | |
connection_status = f"Connected to MCP server. Available tools: {', '.join(tool_names) if tool_names else 'None'}." | |
if not self.http_client: | |
connection_status += " Warning: Hugging Face client is INACTIVE (missing token)." | |
return connection_status | |
except Exception as e: | |
logger.error(f"Connection failed: {e}", exc_info=True) | |
# Ensure cleanup if connection fails at any point | |
if self.exit_stack: | |
await self.exit_stack.aclose() | |
self.exit_stack = None | |
self.session = None | |
self.http_client = None | |
return f"Connection Failed: {e}" | |
def connect(self, server_path: str) -> str: | |
"""Synchronous wrapper for the async connect method.""" | |
return loop.run_until_complete(self._connect(server_path)) | |
def _format_tools_for_prompt(self) -> str: | |
"""Formats the available tool descriptions for the LLM prompt.""" | |
if not self.tools: | |
return "No tools are available for use." | |
tool_descriptions = [] | |
for tool in self.tools: | |
desc = f"- Tool Name: `{tool['name']}`\n" | |
desc += f" Description: {tool['description']}\n" | |
# Optionally include schema for complex tools, keep it concise if possible | |
desc += f" Input Format (JSON Schema): {json.dumps(tool['input_schema'])}" | |
tool_descriptions.append(desc) | |
# Specific instructions for the LLM on how to invoke a tool | |
instruction = ( | |
"You have access to the following tools:\n" | |
f"{chr(10).join(tool_descriptions)}\n\n" # Use newline character explicitly | |
"To use a tool, you MUST respond ONLY with a single JSON object " | |
"containing 'tool_name' and 'tool_input' keys, like this:\n" | |
"```json\n" | |
"{\n" | |
' "tool_name": "<name_of_tool>",\n' | |
' "tool_input": { <arguments_object> }\n' | |
"}\n" | |
"```\n" | |
"Do not include any other text, markdown formatting, or explanations " | |
"before or after the JSON object when calling a tool." | |
) | |
return instruction | |
def _build_system_prompt(self) -> str: | |
"""Constructs the system prompt, including tool usage instructions.""" | |
base_prompt = "You are a helpful assistant. Respond concisely and accurately." | |
tool_info = self._format_tools_for_prompt() | |
# Only add tool info if tools are actually available | |
if self.tools: | |
return f"{base_prompt}\n\n{tool_info}" | |
else: | |
return base_prompt | |
async def _call_huggingface_api(self, messages: List[Dict[str, str]]) -> Dict[str, Any]: | |
"""Makes the API call to the Hugging Face Inference endpoint.""" | |
# This function assumes self.hf_token and self.http_client are valid, | |
# checked by the calling function (_process_query). | |
headers = { | |
"Authorization": f"Bearer {self.hf_token}", | |
"Content-Type": "application/json", | |
} | |
payload = { | |
"model": MODEL_NAME, | |
"messages": messages, | |
"max_tokens": MAX_TOKENS, | |
"stream": False, # Use non-streaming for simplicity | |
# Optional parameters: | |
# "temperature": 0.7, | |
# "top_p": 0.9, | |
} | |
logger.info(f"Sending request to HF API ({MODEL_NAME}). Message count: {len(messages)}.") | |
# Avoid logging full payload in production if it contains sensitive data | |
# logger.debug(f"Payload (first message role): {messages[0]['role'] if messages else 'N/A'}") | |
try: | |
# Ensure http_client exists (redundant check for safety) | |
if not self.http_client: | |
logger.error("FATAL: _call_huggingface_api called but self.http_client is None!") | |
return {"error": "Internal state error: HTTP client is missing."} | |
response = await self.http_client.post(HF_API_URL, headers=headers, json=payload) | |
response.raise_for_status() # Raises HTTPStatusError for 4xx/5xx responses | |
logger.info(f"Received successful response from HF API (Status: {response.status_code}).") | |
return response.json() | |
except httpx.HTTPStatusError as e: | |
logger.error(f"HF API HTTP error: {e.response.status_code} - Response: {e.response.text}", exc_info=True) | |
return {"error": f"API request failed ({e.response.status_code})", "details": e.response.text} | |
except httpx.TimeoutException as e: | |
logger.error(f"HF API request timed out after {HTTP_TIMEOUT}s: {e}", exc_info=True) | |
return {"error": "API request timed out."} | |
except httpx.RequestError as e: | |
logger.error(f"HF API request error: {e}", exc_info=True) | |
return {"error": f"API request failed: {e}"} | |
except json.JSONDecodeError as e: | |
# Handle cases where the response is not valid JSON | |
response_text = await response.aread() if 'response' in locals() else b'Unknown response' | |
logger.error(f"Failed to decode JSON response from HF API: {e}. Raw text: {response_text.decode(errors='ignore')}", exc_info=True) | |
return {"error": "Invalid JSON response from API.", "raw_response": response_text.decode(errors='ignore')} | |
except Exception as e: | |
# Catch any other unexpected errors during the API call | |
logger.error(f"An unexpected error occurred during HF API call: {e}", exc_info=True) | |
return {"error": f"An unexpected error occurred: {e}"} | |
def process_message(self, message: str, history: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], Dict]: | |
""" | |
Handles incoming user messages, processes them using the LLM and tools, | |
and returns the updated conversation history for Gradio. | |
Args: | |
message: The new message text from the user. | |
history: The current conversation history (List of {'role':..., 'content':...} dicts). | |
Returns: | |
A tuple containing: | |
- The complete updated conversation history (List of dicts). | |
- A Gradio update dictionary to clear the input textbox. | |
""" | |
logger.info(f"Processing message: '{message[:50]}...'") | |
logger.debug(f"Received history (type: {type(history)}, len: {len(history)}).") | |
if history: | |
logger.debug(f"First history item type: {type(history[0])}, Keys: {history[0].keys() if isinstance(history[0], dict) else 'N/A'}") | |
# --- Create a working copy of the history --- | |
# Avoids modifying the state Gradio passed in directly. | |
current_conversation_history = list(history) | |
# --- Validate Connection State --- | |
if not self.session: | |
logger.warning("MCP session not available in process_message. Aborting.") | |
current_conversation_history.append({"role": "user", "content": message}) | |
current_conversation_history.append({"role": "assistant", "content": "Error: Not connected to MCP server. Please connect first."}) | |
return current_conversation_history, gr.update(value="") # Clear input | |
if not self.http_client or not self.hf_token: | |
logger.warning("Hugging Face client/token not ready in process_message. Aborting.") | |
current_conversation_history.append({"role": "user", "content": message}) | |
current_conversation_history.append({"role": "assistant", "content": "Error: Hugging Face client is not configured (missing token or connection issue?). Cannot process request."}) | |
return current_conversation_history, gr.update(value="") # Clear input | |
# --- Append User Message to Working History --- | |
current_conversation_history.append({"role": "user", "content": message}) | |
# --- Process Query Asynchronously --- | |
# Pass the full history (including new user message) to the async worker. | |
# Expect a list of *new* assistant messages generated in this turn. | |
try: | |
new_assistant_messages: List[Dict[str, Any]] = loop.run_until_complete( | |
self._process_query(current_conversation_history) | |
) | |
except Exception as e: | |
# Catch unexpected errors during the async processing itself | |
logger.error(f"Error during loop.run_until_complete(_process_query): {e}", exc_info=True) | |
# Add an error message to the output | |
new_assistant_messages = [{ | |
"role": "assistant", | |
"content": f"An internal error occurred while processing your request: {e}" | |
}] | |
# --- Combine History for Return --- | |
# final_history includes the original history, the user message, and the new assistant messages. | |
final_history = current_conversation_history + new_assistant_messages | |
logger.debug(f"Returning updated history (len: {len(final_history)}).") | |
# --- Return Updated State to Gradio --- | |
return final_history, gr.update(value="") # Return new history and clear input | |
async def _process_query(self, conversation_history: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
""" | |
Async function to handle the core logic: call LLM, handle potential tool calls. | |
Args: | |
conversation_history: The full conversation history up to and including | |
the latest user message. | |
Returns: | |
A list containing the new assistant message(s) generated in this turn | |
(text response, tool interactions, errors, etc.). | |
""" | |
# List to hold the new message(s) generated by the assistant in this turn. | |
new_turn_messages = [] | |
# --- Prepare Messages for LLM API --- | |
hf_messages = [] | |
# Add system prompt if not already present or if history is empty | |
if not conversation_history or conversation_history[0].get("role") != "system": | |
logger.debug("Adding system prompt.") | |
hf_messages.append({"role": "system", "content": self._build_system_prompt()}) | |
# Process conversation history for the API call | |
for msg in conversation_history: | |
role = msg.get("role") | |
content = msg.get("content") | |
if not role or content is None: | |
logger.warning(f"Skipping message with missing role/content: {msg}") | |
continue | |
content_str = content if isinstance(content, str) else json.dumps(content) | |
# Add valid roles, prevent duplicate system prompts if handled above | |
if role in ["user", "assistant"]: | |
hf_messages.append({"role": role, "content": content_str}) | |
elif role == "system" and not hf_messages: # Only add if system prompt wasn't added at start | |
hf_messages.append({"role": role, "content": content_str}) | |
# --- Pre-API Call State Check --- | |
token_ok = bool(self.hf_token) | |
# Ensure http_client is not None and is the correct type | |
client_ok = isinstance(self.http_client, httpx.AsyncClient) | |
logger.info(f"State before API call: Token OK? {token_ok}, HTTP Client OK? {client_ok}") | |
if not (token_ok and client_ok): | |
logger.error("Pre-API call check FAILED: Token or Client not ready.") | |
new_turn_messages.append({ | |
"role": "assistant", | |
"content": "Internal Error: API client configuration problem detected before making the call." | |
}) | |
return new_turn_messages # Return error message | |
# --- Make the First API Call --- | |
logger.info("Making initial call to Hugging Face API...") | |
response_data = await self._call_huggingface_api(hf_messages) | |
# --- Handle Initial API Response --- | |
if not response_data or "error" in response_data: | |
error_msg = response_data.get("error", "Unknown API error") if response_data else "No response received" | |
details = response_data.get("details", "") if response_data else "" | |
logger.error(f"Initial API call failed: {error_msg}") | |
new_turn_messages.append({ | |
"role": "assistant", | |
"content": f"Sorry, there was an error calling the language model: {error_msg}" + (f"\nDetails: ```\n{details}\n```" if details else "") | |
}) | |
return new_turn_messages # Return list with error message | |
# --- Extract Assistant Content --- | |
try: | |
assistant_content = response_data.get("choices", [{}])[0].get("message", {}).get("content", "") | |
# Fallback for models that might use 'generated_text' | |
if not assistant_content and "generated_text" in response_data: | |
assistant_content = response_data["generated_text"] | |
if not assistant_content: | |
logger.error(f"Could not extract assistant content. Response keys: {response_data.keys()}") | |
raise ValueError("Empty or missing assistant content in API response.") | |
logger.info("Successfully extracted assistant content from initial response.") | |
# logger.debug(f"Assistant raw content: {assistant_content}") # Be cautious logging full content | |
except (KeyError, IndexError, ValueError, TypeError) as e: | |
logger.error(f"Error parsing initial API response structure: {e}. Response: {response_data}", exc_info=True) | |
new_turn_messages.append({ | |
"role": "assistant", | |
"content": f"Sorry, I received an unexpected response format from the language model. Parsing Error: {e}" | |
}) | |
return new_turn_messages # Return list with error message | |
# --- Check for Tool Use Request --- | |
tool_call_data = None | |
try: | |
# The LLM was instructed to respond *only* with JSON for tool calls | |
potential_tool_call = json.loads(assistant_content) | |
# Validate if it looks like our expected tool call structure | |
if isinstance(potential_tool_call, dict) and "tool_name" in potential_tool_call and "tool_input" in potential_tool_call: | |
tool_call_data = potential_tool_call | |
logger.info(f"Detected tool call request for: {tool_call_data['tool_name']}") | |
else: | |
# Valid JSON, but not the specific format we requested for tools | |
logger.info("Assistant response is valid JSON, but not a recognized tool call format. Treating as text.") | |
# Keep assistant_content as is, tool_call_data remains None | |
except json.JSONDecodeError: | |
# Not JSON, so definitely treat as a regular text response | |
logger.info("Assistant response is not JSON, treating as standard text response.") | |
# Keep assistant_content as is, tool_call_data remains None | |
# --- Process Based on Tool Call or Text --- | |
if tool_call_data: | |
# --- Handle Tool Call --- | |
tool_name = tool_call_data.get("tool_name") | |
tool_args = tool_call_data.get("tool_input", {}) # Default to empty dict if missing | |
available_tool_names = [t["name"] for t in self.tools] | |
if not tool_name or tool_name not in available_tool_names: | |
logger.warning(f"LLM requested invalid or unavailable tool: '{tool_name}'") | |
new_turn_messages.append({ | |
"role": "assistant", | |
"content": f"I tried to use a tool named '{tool_name}', but it seems it's not available or the request was malformed. I will proceed without it." | |
}) | |
# NOTE: Consider calling the LLM again here to inform it the tool failed. | |
# For simplicity, we just return the warning message for now. | |
return new_turn_messages | |
# --- Tool is valid, proceed --- | |
logger.info(f"Executing valid tool call: {tool_name}") | |
# Add messages to Gradio indicating tool use initiation | |
new_turn_messages.append({ | |
"role": "assistant", | |
"content": f"Okay, I need to use the **{tool_name}** tool.", | |
"metadata": {"title": f"β³ Using tool: {tool_name}", "status": "pending", "id": f"tool_call_{tool_name}"} | |
}) | |
# Display parameters used (use ensure_ascii=False for better readability if needed) | |
new_turn_messages.append({ | |
"role": "assistant", | |
"content": f"Parameters:\n```json\n{json.dumps(tool_args, indent=2, ensure_ascii=False)}\n```", | |
"metadata": {"parent_id": f"tool_call_{tool_name}", "id": f"params_{tool_name}", "title": "Tool Parameters"} | |
}) | |
# --- Call the Actual MCP Tool --- | |
try: | |
mcp_result = await self.session.call_tool(tool_name, tool_args) | |
tool_result_content = mcp_result.content | |
logger.info(f"Successfully received result from MCP tool: {tool_name}") | |
# Update Gradio message status to 'done' | |
if new_turn_messages and "metadata" in new_turn_messages[-2]: | |
new_turn_messages[-2]["metadata"]["status"] = "done" | |
new_turn_messages[-2]["metadata"]["title"] = f"β Used tool: {tool_name}" | |
# --- Display Tool Result in Gradio --- | |
new_turn_messages.append({ | |
"role": "assistant", | |
"content": f"Result from **{tool_name}**:", | |
"metadata": {"title": f"Tool Result: {tool_name}", "status": "done", "id": f"result_{tool_name}"} | |
}) | |
# Format result for display (handle JSON, images, etc.) | |
display_content = tool_result_content | |
try: | |
result_json = json.loads(tool_result_content) | |
if isinstance(result_json, dict) and result_json.get("type") == "image" and "url" in result_json: | |
# Handle image result - Gradio chatbot can display images via dict path | |
display_content = {"path": result_json["url"], "alt_text": result_json.get("message", "Generated image")} | |
new_turn_messages.append({ | |
"role": "assistant", "content": display_content, # Send the dict | |
"metadata": {"parent_id": f"result_{tool_name}", "id": f"image_{tool_name}", "title": "Image Result"} | |
}) | |
display_content = None # Mark as handled so raw isn't added below | |
else: | |
# Nicely format other JSON | |
display_content = f"```json\n{json.dumps(result_json, indent=2, ensure_ascii=False)}\n```" | |
except (json.JSONDecodeError, TypeError): | |
# Not JSON or image, display as plain code block if not empty | |
display_content = f"```\n{tool_result_content}\n```" if tool_result_content else "_Tool returned empty content._" | |
# Add the formatted/raw result if not handled above (e.g., image) | |
if display_content: | |
new_turn_messages.append({ | |
"role": "assistant", "content": display_content, | |
"metadata": {"parent_id": f"result_{tool_name}", "id": f"raw_result_{tool_name}", "title": "Formatted Output"} | |
}) | |
# --- Send Tool Result Back to LLM for Final Response --- | |
# Prepare message history for the second LLM call | |
hf_messages_for_final_call = list(hf_messages) # Start with messages from first call | |
# Add the assistant's message that *was* the tool call JSON | |
hf_messages_for_final_call.append({"role": "assistant", "content": assistant_content}) | |
# Add a user message containing the tool's result | |
hf_messages_for_final_call.append({ | |
"role": "user", | |
"content": f"The '{tool_name}' tool execution resulted in:\n```\n{tool_result_content}\n```\nPlease summarize this result or continue based on it." | |
}) | |
logger.info("Sending tool result back to HF API for final interpretation.") | |
# --- Pre-API Call State Check (Again) --- | |
token_ok_final = bool(self.hf_token) | |
client_ok_final = isinstance(self.http_client, httpx.AsyncClient) | |
logger.info(f"State before final API call: Token OK? {token_ok_final}, HTTP Client OK? {client_ok_final}") | |
if not (token_ok_final and client_ok_final): | |
logger.error("Pre-API call check FAILED before final call.") | |
new_turn_messages.append({"role": "assistant", "content": "Internal Error: Client state issue before getting final response after tool use."}) | |
# Return messages generated so far (tool use + error) | |
return new_turn_messages | |
# --- Make the Second API Call --- | |
final_response_data = await self._call_huggingface_api(hf_messages_for_final_call) | |
# --- Process Final LLM Response --- | |
if final_response_data and "error" not in final_response_data: | |
try: | |
final_assistant_content = final_response_data.get("choices", [{}])[0].get("message", {}).get("content", "") | |
# ... (fallback for generated_text) ... | |
if final_assistant_content: | |
logger.info("Successfully extracted final assistant response after tool use.") | |
new_turn_messages.append({"role": "assistant", "content": final_assistant_content}) | |
else: | |
raise ValueError("Empty final assistant content after tool use.") | |
except Exception as e: | |
logger.error(f"Error parsing final API response after tool use: {e}", exc_info=True) | |
new_turn_messages.append({"role": "assistant", "content": f"Sorry, error processing the final response after tool use: {e}"}) | |
else: # Handle error in the second API call itself | |
error_msg = final_response_data.get("error", "API Error") if final_response_data else "API Error" | |
details = final_response_data.get("details", "") if final_response_data else "" | |
logger.error(f"Final API call (after tool use) failed: {error_msg}") | |
new_turn_messages.append({"role": "assistant", "content": f"Sorry, error processing tool result with LLM: {error_msg}" + (f"\nDetails: ```\n{details}\n```" if details else "")}) | |
except Exception as e: # Handle error during the MCP tool call (`session.call_tool`) | |
logger.error(f"Error calling MCP tool '{tool_name}': {e}", exc_info=True) | |
# Update Gradio status to 'error' | |
if new_turn_messages and "metadata" in new_turn_messages[-2]: | |
new_turn_messages[-2]["metadata"]["status"] = "error" | |
new_turn_messages[-2]["metadata"]["title"] = f"β Error using tool: {tool_name}" | |
# Add error message for the user | |
new_turn_messages.append({"role": "assistant", "content": f"Sorry, I encountered an error when trying to use the tool '{tool_name}': {e}"}) | |
else: | |
# --- Handle Regular Text Response --- | |
logger.info("Adding standard text response to Gradio output.") | |
new_turn_messages.append({ | |
"role": "assistant", | |
"content": assistant_content | |
}) | |
# Return the list of *new* assistant messages generated in this turn | |
return new_turn_messages | |
async def close_connection(self): | |
"""Closes the MCP connection and HTTP client gracefully.""" | |
if self.exit_stack: | |
logger.info("Closing MCP connection and HTTP client resources.") | |
try: | |
await self.exit_stack.aclose() | |
except Exception as e: | |
logger.error(f"Error during resource cleanup: {e}", exc_info=True) | |
finally: | |
# Reset state variables regardless of cleanup success | |
self.exit_stack = None | |
self.session = None | |
self.http_client = None | |
self.tools = [] | |
logger.info("Resources cleanup attempted.") | |
else: | |
logger.info("Close connection called but no active connection found.") | |
# --- Gradio Interface Definition --- | |
client = MCPClientWrapper() # Instantiate the client wrapper globally | |
def create_gradio_interface() -> gr.Blocks: | |
"""Creates and configures the Gradio interface.""" | |
logger.info("Creating Gradio interface.") | |
with gr.Blocks( | |
title="MCP Client + HF Inference", | |
theme="Nymbo/Nymbo_Theme_5", | |
css="#chatbot { font-size: 1.1em; } .message { padding: 10px !important; }" # Example CSS | |
) as demo: | |
gr.Markdown(f"# π€ MCP Assistant ({MODEL_NAME})") | |
gr.Markdown("Connect to an MCP server and chat with a Hugging Face LLM.") | |
# Connection Row | |
with gr.Row(): | |
server_path = gr.Textbox( | |
label="MCP Server Script Path", | |
placeholder="Enter path to server script", | |
value=DEFAULT_SERVER_SCRIPT, # Use default value | |
scale=3 | |
) | |
connect_btn = gr.Button("π Connect to MCP Server", variant="primary", scale=1) | |
status = gr.Textbox(label="Connection Status", interactive=False, placeholder="Not connected.") | |
# Chatbot Display | |
chatbot = gr.Chatbot( | |
label="Conversation", | |
elem_id="chatbot", | |
height=650, | |
show_copy_button=True, | |
bubble_full_width=False, # Chat bubbles don't span full width | |
avatar_images=("π€", "π€"), # User and Hugging Face avatars | |
type="messages", # IMPORTANT: Use the dictionary format | |
show_label=False # Hide the "Conversation" label above chat | |
) | |
# Input Row | |
with gr.Row(): | |
msg_textbox = gr.Textbox( | |
label="Your Message", | |
placeholder="Type your message here and press Enter...", | |
scale=4, | |
autofocus=True, | |
show_label=False, # Hide the "Your Message" label | |
container=False # Remove container padding/border for tighter look | |
) | |
clear_btn = gr.Button("ποΈ Clear Chat", scale=1) | |
# --- Event Handlers --- | |
# Connect Button Action | |
connect_btn.click( | |
fn=client.connect, # Call the connect method | |
inputs=[server_path], # Pass the server path textbox | |
outputs=[status] # Update the status textbox | |
) | |
# Message Submission Action (Enter key in textbox) | |
msg_textbox.submit( | |
fn=client.process_message, # Call the main message processing function | |
inputs=[msg_textbox, chatbot], # Pass current message and chat history | |
outputs=[chatbot, msg_textbox] # Update chat history and clear message box | |
) | |
# Clear Button Action | |
def clear_chat_and_input(): | |
logger.info("Clear chat button clicked.") | |
return [], "" # Return empty list for chatbot, empty string for textbox | |
clear_btn.click( | |
fn=clear_chat_and_input, | |
inputs=[], | |
outputs=[chatbot, msg_textbox], | |
queue=False # Don't queue this action | |
) | |
# Handle application shutdown (optional, but good practice) | |
# demo.unload(client.close_connection) # Requires newer Gradio, async handling can be complex | |
logger.info("Gradio interface created successfully.") | |
return demo | |
# --- Main Execution Block --- | |
if __name__ == "__main__": | |
print("\n" + "="*60) | |
print(" MCP Client with Hugging Face Inference API ") | |
print(f" Model: {MODEL_NAME}") | |
print("="*60 + "\n") | |
# Check for Hugging Face token on startup | |
if not HF_TOKEN: | |
print("\n" + "*"*60) | |
print(" WARNING: HF_TOKEN environment variable not found! ") | |
print(" Please set it in your .env file or environment variables.") | |
print(" The application will run, but language model features") | |
print(" requiring the Hugging Face API will be disabled.") | |
print("*"*60 + "\n") | |
else: | |
print("β HF_TOKEN found.\n") | |
# Create and launch the Gradio interface | |
interface = create_gradio_interface() | |
print("Launching Gradio interface...") | |
# Use server_name="0.0.0.0" to make accessible on local network | |
# Use share=True for a temporary public link (requires Gradio account sometimes) | |
interface.launch(debug=True, server_name="0.0.0.0") | |
print("\nInterface launched. Access it at the URL provided above.") | |
print("Press Ctrl+C to stop the server.") | |
# Optional: Add explicit cleanup on exit using asyncio if demo.unload isn't used/sufficient | |
try: | |
# Gradio's launch() typically blocks, so this part might only run after shutdown | |
pass | |
except KeyboardInterrupt: | |
logger.info("KeyboardInterrupt received, attempting shutdown.") | |
if client: | |
print("Closing connections...") | |
loop.run_until_complete(client.close_connection()) | |
print("Cleanup complete.") | |
finally: | |
logger.info("Application shutting down.") |