MCP-Image-Gen / app.py
Nymbo's picture
Update app.py
abffc09 verified
# -*- coding: utf-8 -*-
"""
Gradio Chat Interface for MCP (Meta Calling Protocol) Client
using Hugging Face Inference API for the Language Model.
"""
import asyncio
import os
import json
from typing import List, Dict, Any, Union, Optional, Tuple
from contextlib import AsyncExitStack
import logging
import traceback
# Third-party libraries
import httpx # For async HTTP requests
import gradio as gr
from gradio.components.chatbot import ChatMessage # Although type="messages" uses dicts primarily
from dotenv import load_dotenv
# MCP specific imports
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
# --- Configuration ---
load_dotenv() # Load environment variables from .env file
# Hugging Face API Configuration
HF_TOKEN = os.getenv("HF_TOKEN")
# Specify the desired Hugging Face model endpoint
HF_API_URL = "https://router.huggingface.co/hf-inference/models/Qwen/Qwen3-235B-A22B/v1/chat/completions"
MODEL_NAME = "Qwen/Qwen3-235B-A22B" # Model name for payload and display
MAX_TOKENS = 1500 # Max tokens for the LLM response
HTTP_TIMEOUT = 120 # Increased timeout for potentially slow model responses
# Default MCP Server Script Path
DEFAULT_SERVER_SCRIPT = "gradio_mcp_server.py"
# --- Logging Setup ---
logging.basicConfig(
level=logging.INFO, # Set to DEBUG for more verbose output
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s'
)
logger = logging.getLogger(__name__)
# --- Async Event Loop ---
# Get the current event loop or create a new one if none exists
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
logger.info("Asyncio event loop initialized.")
# --- MCP Client Wrapper Class ---
class MCPClientWrapper:
"""
Manages the connection to the MCP server, interaction with Hugging Face API,
and Gradio message processing logic.
"""
def __init__(self):
"""Initializes the wrapper, loading configuration."""
self.session: Optional[ClientSession] = None
self.exit_stack: Optional[AsyncExitStack] = None
self.tools: List[Dict[str, Any]] = []
self.http_client: Optional[httpx.AsyncClient] = None
self.hf_token: Optional[str] = os.getenv("HF_TOKEN")
if not self.hf_token:
logger.warning("HF_TOKEN environment variable not found. Hugging Face API calls will be disabled.")
else:
# Log only a part of the token for verification, NEVER the full token.
logger.info(f"HF_TOKEN loaded successfully (starts with: {self.hf_token[:4]}...).")
async def _connect(self, server_path: str) -> str:
"""Establishes connection to the MCP server and initializes HTTP client."""
# Gracefully close existing resources if reconnecting
if self.exit_stack:
logger.info("Closing existing connection and resources before reconnecting.")
await self.exit_stack.aclose()
# Explicitly reset state variables
self.exit_stack = None
self.session = None
self.http_client = None
self.tools = []
logger.info(f"Attempting to connect to MCP server script: {server_path}")
self.exit_stack = AsyncExitStack()
try:
# Determine server command (python or node)
is_python = server_path.lower().endswith('.py')
command = "python" if is_python else "node"
logger.info(f"Using command '{command}' for server.")
# Configure MCP server parameters
server_params = StdioServerParameters(
command=command,
args=[server_path],
env={"PYTHONIOENCODING": "utf-8", "PYTHONUNBUFFERED": "1"}
)
# --- Establish MCP Connection ---
logger.info("Initializing stdio transport...")
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport
logger.info("Stdio transport established.")
logger.info("Initializing MCP client session...")
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
await self.session.initialize()
logger.info("MCP session initialized successfully.")
# --- Initialize HTTP Client for Hugging Face ---
if self.hf_token:
logger.info("Initializing HTTP client for Hugging Face API...")
self.http_client = await self.exit_stack.enter_async_context(
httpx.AsyncClient(timeout=HTTP_TIMEOUT)
)
logger.info("HTTP client initialized successfully.")
else:
logger.warning("HTTP client NOT initialized because HF_TOKEN is missing.")
self.http_client = None # Ensure it's None
# --- List Available MCP Tools ---
logger.info("Listing available tools from MCP server...")
response = await self.session.list_tools()
self.tools = [{
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema # Keep schema for potential richer prompts
} for tool in response.tools]
tool_names = [tool["name"] for tool in self.tools]
logger.info(f"Available tools retrieved: {tool_names if tool_names else 'None'}")
# --- Prepare Connection Status Message ---
connection_status = f"Connected to MCP server. Available tools: {', '.join(tool_names) if tool_names else 'None'}."
if not self.http_client:
connection_status += " Warning: Hugging Face client is INACTIVE (missing token)."
return connection_status
except Exception as e:
logger.error(f"Connection failed: {e}", exc_info=True)
# Ensure cleanup if connection fails at any point
if self.exit_stack:
await self.exit_stack.aclose()
self.exit_stack = None
self.session = None
self.http_client = None
return f"Connection Failed: {e}"
def connect(self, server_path: str) -> str:
"""Synchronous wrapper for the async connect method."""
return loop.run_until_complete(self._connect(server_path))
def _format_tools_for_prompt(self) -> str:
"""Formats the available tool descriptions for the LLM prompt."""
if not self.tools:
return "No tools are available for use."
tool_descriptions = []
for tool in self.tools:
desc = f"- Tool Name: `{tool['name']}`\n"
desc += f" Description: {tool['description']}\n"
# Optionally include schema for complex tools, keep it concise if possible
desc += f" Input Format (JSON Schema): {json.dumps(tool['input_schema'])}"
tool_descriptions.append(desc)
# Specific instructions for the LLM on how to invoke a tool
instruction = (
"You have access to the following tools:\n"
f"{chr(10).join(tool_descriptions)}\n\n" # Use newline character explicitly
"To use a tool, you MUST respond ONLY with a single JSON object "
"containing 'tool_name' and 'tool_input' keys, like this:\n"
"```json\n"
"{\n"
' "tool_name": "<name_of_tool>",\n'
' "tool_input": { <arguments_object> }\n'
"}\n"
"```\n"
"Do not include any other text, markdown formatting, or explanations "
"before or after the JSON object when calling a tool."
)
return instruction
def _build_system_prompt(self) -> str:
"""Constructs the system prompt, including tool usage instructions."""
base_prompt = "You are a helpful assistant. Respond concisely and accurately."
tool_info = self._format_tools_for_prompt()
# Only add tool info if tools are actually available
if self.tools:
return f"{base_prompt}\n\n{tool_info}"
else:
return base_prompt
async def _call_huggingface_api(self, messages: List[Dict[str, str]]) -> Dict[str, Any]:
"""Makes the API call to the Hugging Face Inference endpoint."""
# This function assumes self.hf_token and self.http_client are valid,
# checked by the calling function (_process_query).
headers = {
"Authorization": f"Bearer {self.hf_token}",
"Content-Type": "application/json",
}
payload = {
"model": MODEL_NAME,
"messages": messages,
"max_tokens": MAX_TOKENS,
"stream": False, # Use non-streaming for simplicity
# Optional parameters:
# "temperature": 0.7,
# "top_p": 0.9,
}
logger.info(f"Sending request to HF API ({MODEL_NAME}). Message count: {len(messages)}.")
# Avoid logging full payload in production if it contains sensitive data
# logger.debug(f"Payload (first message role): {messages[0]['role'] if messages else 'N/A'}")
try:
# Ensure http_client exists (redundant check for safety)
if not self.http_client:
logger.error("FATAL: _call_huggingface_api called but self.http_client is None!")
return {"error": "Internal state error: HTTP client is missing."}
response = await self.http_client.post(HF_API_URL, headers=headers, json=payload)
response.raise_for_status() # Raises HTTPStatusError for 4xx/5xx responses
logger.info(f"Received successful response from HF API (Status: {response.status_code}).")
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"HF API HTTP error: {e.response.status_code} - Response: {e.response.text}", exc_info=True)
return {"error": f"API request failed ({e.response.status_code})", "details": e.response.text}
except httpx.TimeoutException as e:
logger.error(f"HF API request timed out after {HTTP_TIMEOUT}s: {e}", exc_info=True)
return {"error": "API request timed out."}
except httpx.RequestError as e:
logger.error(f"HF API request error: {e}", exc_info=True)
return {"error": f"API request failed: {e}"}
except json.JSONDecodeError as e:
# Handle cases where the response is not valid JSON
response_text = await response.aread() if 'response' in locals() else b'Unknown response'
logger.error(f"Failed to decode JSON response from HF API: {e}. Raw text: {response_text.decode(errors='ignore')}", exc_info=True)
return {"error": "Invalid JSON response from API.", "raw_response": response_text.decode(errors='ignore')}
except Exception as e:
# Catch any other unexpected errors during the API call
logger.error(f"An unexpected error occurred during HF API call: {e}", exc_info=True)
return {"error": f"An unexpected error occurred: {e}"}
def process_message(self, message: str, history: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], Dict]:
"""
Handles incoming user messages, processes them using the LLM and tools,
and returns the updated conversation history for Gradio.
Args:
message: The new message text from the user.
history: The current conversation history (List of {'role':..., 'content':...} dicts).
Returns:
A tuple containing:
- The complete updated conversation history (List of dicts).
- A Gradio update dictionary to clear the input textbox.
"""
logger.info(f"Processing message: '{message[:50]}...'")
logger.debug(f"Received history (type: {type(history)}, len: {len(history)}).")
if history:
logger.debug(f"First history item type: {type(history[0])}, Keys: {history[0].keys() if isinstance(history[0], dict) else 'N/A'}")
# --- Create a working copy of the history ---
# Avoids modifying the state Gradio passed in directly.
current_conversation_history = list(history)
# --- Validate Connection State ---
if not self.session:
logger.warning("MCP session not available in process_message. Aborting.")
current_conversation_history.append({"role": "user", "content": message})
current_conversation_history.append({"role": "assistant", "content": "Error: Not connected to MCP server. Please connect first."})
return current_conversation_history, gr.update(value="") # Clear input
if not self.http_client or not self.hf_token:
logger.warning("Hugging Face client/token not ready in process_message. Aborting.")
current_conversation_history.append({"role": "user", "content": message})
current_conversation_history.append({"role": "assistant", "content": "Error: Hugging Face client is not configured (missing token or connection issue?). Cannot process request."})
return current_conversation_history, gr.update(value="") # Clear input
# --- Append User Message to Working History ---
current_conversation_history.append({"role": "user", "content": message})
# --- Process Query Asynchronously ---
# Pass the full history (including new user message) to the async worker.
# Expect a list of *new* assistant messages generated in this turn.
try:
new_assistant_messages: List[Dict[str, Any]] = loop.run_until_complete(
self._process_query(current_conversation_history)
)
except Exception as e:
# Catch unexpected errors during the async processing itself
logger.error(f"Error during loop.run_until_complete(_process_query): {e}", exc_info=True)
# Add an error message to the output
new_assistant_messages = [{
"role": "assistant",
"content": f"An internal error occurred while processing your request: {e}"
}]
# --- Combine History for Return ---
# final_history includes the original history, the user message, and the new assistant messages.
final_history = current_conversation_history + new_assistant_messages
logger.debug(f"Returning updated history (len: {len(final_history)}).")
# --- Return Updated State to Gradio ---
return final_history, gr.update(value="") # Return new history and clear input
async def _process_query(self, conversation_history: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Async function to handle the core logic: call LLM, handle potential tool calls.
Args:
conversation_history: The full conversation history up to and including
the latest user message.
Returns:
A list containing the new assistant message(s) generated in this turn
(text response, tool interactions, errors, etc.).
"""
# List to hold the new message(s) generated by the assistant in this turn.
new_turn_messages = []
# --- Prepare Messages for LLM API ---
hf_messages = []
# Add system prompt if not already present or if history is empty
if not conversation_history or conversation_history[0].get("role") != "system":
logger.debug("Adding system prompt.")
hf_messages.append({"role": "system", "content": self._build_system_prompt()})
# Process conversation history for the API call
for msg in conversation_history:
role = msg.get("role")
content = msg.get("content")
if not role or content is None:
logger.warning(f"Skipping message with missing role/content: {msg}")
continue
content_str = content if isinstance(content, str) else json.dumps(content)
# Add valid roles, prevent duplicate system prompts if handled above
if role in ["user", "assistant"]:
hf_messages.append({"role": role, "content": content_str})
elif role == "system" and not hf_messages: # Only add if system prompt wasn't added at start
hf_messages.append({"role": role, "content": content_str})
# --- Pre-API Call State Check ---
token_ok = bool(self.hf_token)
# Ensure http_client is not None and is the correct type
client_ok = isinstance(self.http_client, httpx.AsyncClient)
logger.info(f"State before API call: Token OK? {token_ok}, HTTP Client OK? {client_ok}")
if not (token_ok and client_ok):
logger.error("Pre-API call check FAILED: Token or Client not ready.")
new_turn_messages.append({
"role": "assistant",
"content": "Internal Error: API client configuration problem detected before making the call."
})
return new_turn_messages # Return error message
# --- Make the First API Call ---
logger.info("Making initial call to Hugging Face API...")
response_data = await self._call_huggingface_api(hf_messages)
# --- Handle Initial API Response ---
if not response_data or "error" in response_data:
error_msg = response_data.get("error", "Unknown API error") if response_data else "No response received"
details = response_data.get("details", "") if response_data else ""
logger.error(f"Initial API call failed: {error_msg}")
new_turn_messages.append({
"role": "assistant",
"content": f"Sorry, there was an error calling the language model: {error_msg}" + (f"\nDetails: ```\n{details}\n```" if details else "")
})
return new_turn_messages # Return list with error message
# --- Extract Assistant Content ---
try:
assistant_content = response_data.get("choices", [{}])[0].get("message", {}).get("content", "")
# Fallback for models that might use 'generated_text'
if not assistant_content and "generated_text" in response_data:
assistant_content = response_data["generated_text"]
if not assistant_content:
logger.error(f"Could not extract assistant content. Response keys: {response_data.keys()}")
raise ValueError("Empty or missing assistant content in API response.")
logger.info("Successfully extracted assistant content from initial response.")
# logger.debug(f"Assistant raw content: {assistant_content}") # Be cautious logging full content
except (KeyError, IndexError, ValueError, TypeError) as e:
logger.error(f"Error parsing initial API response structure: {e}. Response: {response_data}", exc_info=True)
new_turn_messages.append({
"role": "assistant",
"content": f"Sorry, I received an unexpected response format from the language model. Parsing Error: {e}"
})
return new_turn_messages # Return list with error message
# --- Check for Tool Use Request ---
tool_call_data = None
try:
# The LLM was instructed to respond *only* with JSON for tool calls
potential_tool_call = json.loads(assistant_content)
# Validate if it looks like our expected tool call structure
if isinstance(potential_tool_call, dict) and "tool_name" in potential_tool_call and "tool_input" in potential_tool_call:
tool_call_data = potential_tool_call
logger.info(f"Detected tool call request for: {tool_call_data['tool_name']}")
else:
# Valid JSON, but not the specific format we requested for tools
logger.info("Assistant response is valid JSON, but not a recognized tool call format. Treating as text.")
# Keep assistant_content as is, tool_call_data remains None
except json.JSONDecodeError:
# Not JSON, so definitely treat as a regular text response
logger.info("Assistant response is not JSON, treating as standard text response.")
# Keep assistant_content as is, tool_call_data remains None
# --- Process Based on Tool Call or Text ---
if tool_call_data:
# --- Handle Tool Call ---
tool_name = tool_call_data.get("tool_name")
tool_args = tool_call_data.get("tool_input", {}) # Default to empty dict if missing
available_tool_names = [t["name"] for t in self.tools]
if not tool_name or tool_name not in available_tool_names:
logger.warning(f"LLM requested invalid or unavailable tool: '{tool_name}'")
new_turn_messages.append({
"role": "assistant",
"content": f"I tried to use a tool named '{tool_name}', but it seems it's not available or the request was malformed. I will proceed without it."
})
# NOTE: Consider calling the LLM again here to inform it the tool failed.
# For simplicity, we just return the warning message for now.
return new_turn_messages
# --- Tool is valid, proceed ---
logger.info(f"Executing valid tool call: {tool_name}")
# Add messages to Gradio indicating tool use initiation
new_turn_messages.append({
"role": "assistant",
"content": f"Okay, I need to use the **{tool_name}** tool.",
"metadata": {"title": f"⏳ Using tool: {tool_name}", "status": "pending", "id": f"tool_call_{tool_name}"}
})
# Display parameters used (use ensure_ascii=False for better readability if needed)
new_turn_messages.append({
"role": "assistant",
"content": f"Parameters:\n```json\n{json.dumps(tool_args, indent=2, ensure_ascii=False)}\n```",
"metadata": {"parent_id": f"tool_call_{tool_name}", "id": f"params_{tool_name}", "title": "Tool Parameters"}
})
# --- Call the Actual MCP Tool ---
try:
mcp_result = await self.session.call_tool(tool_name, tool_args)
tool_result_content = mcp_result.content
logger.info(f"Successfully received result from MCP tool: {tool_name}")
# Update Gradio message status to 'done'
if new_turn_messages and "metadata" in new_turn_messages[-2]:
new_turn_messages[-2]["metadata"]["status"] = "done"
new_turn_messages[-2]["metadata"]["title"] = f"βœ… Used tool: {tool_name}"
# --- Display Tool Result in Gradio ---
new_turn_messages.append({
"role": "assistant",
"content": f"Result from **{tool_name}**:",
"metadata": {"title": f"Tool Result: {tool_name}", "status": "done", "id": f"result_{tool_name}"}
})
# Format result for display (handle JSON, images, etc.)
display_content = tool_result_content
try:
result_json = json.loads(tool_result_content)
if isinstance(result_json, dict) and result_json.get("type") == "image" and "url" in result_json:
# Handle image result - Gradio chatbot can display images via dict path
display_content = {"path": result_json["url"], "alt_text": result_json.get("message", "Generated image")}
new_turn_messages.append({
"role": "assistant", "content": display_content, # Send the dict
"metadata": {"parent_id": f"result_{tool_name}", "id": f"image_{tool_name}", "title": "Image Result"}
})
display_content = None # Mark as handled so raw isn't added below
else:
# Nicely format other JSON
display_content = f"```json\n{json.dumps(result_json, indent=2, ensure_ascii=False)}\n```"
except (json.JSONDecodeError, TypeError):
# Not JSON or image, display as plain code block if not empty
display_content = f"```\n{tool_result_content}\n```" if tool_result_content else "_Tool returned empty content._"
# Add the formatted/raw result if not handled above (e.g., image)
if display_content:
new_turn_messages.append({
"role": "assistant", "content": display_content,
"metadata": {"parent_id": f"result_{tool_name}", "id": f"raw_result_{tool_name}", "title": "Formatted Output"}
})
# --- Send Tool Result Back to LLM for Final Response ---
# Prepare message history for the second LLM call
hf_messages_for_final_call = list(hf_messages) # Start with messages from first call
# Add the assistant's message that *was* the tool call JSON
hf_messages_for_final_call.append({"role": "assistant", "content": assistant_content})
# Add a user message containing the tool's result
hf_messages_for_final_call.append({
"role": "user",
"content": f"The '{tool_name}' tool execution resulted in:\n```\n{tool_result_content}\n```\nPlease summarize this result or continue based on it."
})
logger.info("Sending tool result back to HF API for final interpretation.")
# --- Pre-API Call State Check (Again) ---
token_ok_final = bool(self.hf_token)
client_ok_final = isinstance(self.http_client, httpx.AsyncClient)
logger.info(f"State before final API call: Token OK? {token_ok_final}, HTTP Client OK? {client_ok_final}")
if not (token_ok_final and client_ok_final):
logger.error("Pre-API call check FAILED before final call.")
new_turn_messages.append({"role": "assistant", "content": "Internal Error: Client state issue before getting final response after tool use."})
# Return messages generated so far (tool use + error)
return new_turn_messages
# --- Make the Second API Call ---
final_response_data = await self._call_huggingface_api(hf_messages_for_final_call)
# --- Process Final LLM Response ---
if final_response_data and "error" not in final_response_data:
try:
final_assistant_content = final_response_data.get("choices", [{}])[0].get("message", {}).get("content", "")
# ... (fallback for generated_text) ...
if final_assistant_content:
logger.info("Successfully extracted final assistant response after tool use.")
new_turn_messages.append({"role": "assistant", "content": final_assistant_content})
else:
raise ValueError("Empty final assistant content after tool use.")
except Exception as e:
logger.error(f"Error parsing final API response after tool use: {e}", exc_info=True)
new_turn_messages.append({"role": "assistant", "content": f"Sorry, error processing the final response after tool use: {e}"})
else: # Handle error in the second API call itself
error_msg = final_response_data.get("error", "API Error") if final_response_data else "API Error"
details = final_response_data.get("details", "") if final_response_data else ""
logger.error(f"Final API call (after tool use) failed: {error_msg}")
new_turn_messages.append({"role": "assistant", "content": f"Sorry, error processing tool result with LLM: {error_msg}" + (f"\nDetails: ```\n{details}\n```" if details else "")})
except Exception as e: # Handle error during the MCP tool call (`session.call_tool`)
logger.error(f"Error calling MCP tool '{tool_name}': {e}", exc_info=True)
# Update Gradio status to 'error'
if new_turn_messages and "metadata" in new_turn_messages[-2]:
new_turn_messages[-2]["metadata"]["status"] = "error"
new_turn_messages[-2]["metadata"]["title"] = f"❌ Error using tool: {tool_name}"
# Add error message for the user
new_turn_messages.append({"role": "assistant", "content": f"Sorry, I encountered an error when trying to use the tool '{tool_name}': {e}"})
else:
# --- Handle Regular Text Response ---
logger.info("Adding standard text response to Gradio output.")
new_turn_messages.append({
"role": "assistant",
"content": assistant_content
})
# Return the list of *new* assistant messages generated in this turn
return new_turn_messages
async def close_connection(self):
"""Closes the MCP connection and HTTP client gracefully."""
if self.exit_stack:
logger.info("Closing MCP connection and HTTP client resources.")
try:
await self.exit_stack.aclose()
except Exception as e:
logger.error(f"Error during resource cleanup: {e}", exc_info=True)
finally:
# Reset state variables regardless of cleanup success
self.exit_stack = None
self.session = None
self.http_client = None
self.tools = []
logger.info("Resources cleanup attempted.")
else:
logger.info("Close connection called but no active connection found.")
# --- Gradio Interface Definition ---
client = MCPClientWrapper() # Instantiate the client wrapper globally
def create_gradio_interface() -> gr.Blocks:
"""Creates and configures the Gradio interface."""
logger.info("Creating Gradio interface.")
with gr.Blocks(
title="MCP Client + HF Inference",
theme="Nymbo/Nymbo_Theme_5",
css="#chatbot { font-size: 1.1em; } .message { padding: 10px !important; }" # Example CSS
) as demo:
gr.Markdown(f"# πŸ€– MCP Assistant ({MODEL_NAME})")
gr.Markdown("Connect to an MCP server and chat with a Hugging Face LLM.")
# Connection Row
with gr.Row():
server_path = gr.Textbox(
label="MCP Server Script Path",
placeholder="Enter path to server script",
value=DEFAULT_SERVER_SCRIPT, # Use default value
scale=3
)
connect_btn = gr.Button("πŸ”Œ Connect to MCP Server", variant="primary", scale=1)
status = gr.Textbox(label="Connection Status", interactive=False, placeholder="Not connected.")
# Chatbot Display
chatbot = gr.Chatbot(
label="Conversation",
elem_id="chatbot",
height=650,
show_copy_button=True,
bubble_full_width=False, # Chat bubbles don't span full width
avatar_images=("πŸ‘€", "πŸ€—"), # User and Hugging Face avatars
type="messages", # IMPORTANT: Use the dictionary format
show_label=False # Hide the "Conversation" label above chat
)
# Input Row
with gr.Row():
msg_textbox = gr.Textbox(
label="Your Message",
placeholder="Type your message here and press Enter...",
scale=4,
autofocus=True,
show_label=False, # Hide the "Your Message" label
container=False # Remove container padding/border for tighter look
)
clear_btn = gr.Button("πŸ—‘οΈ Clear Chat", scale=1)
# --- Event Handlers ---
# Connect Button Action
connect_btn.click(
fn=client.connect, # Call the connect method
inputs=[server_path], # Pass the server path textbox
outputs=[status] # Update the status textbox
)
# Message Submission Action (Enter key in textbox)
msg_textbox.submit(
fn=client.process_message, # Call the main message processing function
inputs=[msg_textbox, chatbot], # Pass current message and chat history
outputs=[chatbot, msg_textbox] # Update chat history and clear message box
)
# Clear Button Action
def clear_chat_and_input():
logger.info("Clear chat button clicked.")
return [], "" # Return empty list for chatbot, empty string for textbox
clear_btn.click(
fn=clear_chat_and_input,
inputs=[],
outputs=[chatbot, msg_textbox],
queue=False # Don't queue this action
)
# Handle application shutdown (optional, but good practice)
# demo.unload(client.close_connection) # Requires newer Gradio, async handling can be complex
logger.info("Gradio interface created successfully.")
return demo
# --- Main Execution Block ---
if __name__ == "__main__":
print("\n" + "="*60)
print(" MCP Client with Hugging Face Inference API ")
print(f" Model: {MODEL_NAME}")
print("="*60 + "\n")
# Check for Hugging Face token on startup
if not HF_TOKEN:
print("\n" + "*"*60)
print(" WARNING: HF_TOKEN environment variable not found! ")
print(" Please set it in your .env file or environment variables.")
print(" The application will run, but language model features")
print(" requiring the Hugging Face API will be disabled.")
print("*"*60 + "\n")
else:
print("βœ“ HF_TOKEN found.\n")
# Create and launch the Gradio interface
interface = create_gradio_interface()
print("Launching Gradio interface...")
# Use server_name="0.0.0.0" to make accessible on local network
# Use share=True for a temporary public link (requires Gradio account sometimes)
interface.launch(debug=True, server_name="0.0.0.0")
print("\nInterface launched. Access it at the URL provided above.")
print("Press Ctrl+C to stop the server.")
# Optional: Add explicit cleanup on exit using asyncio if demo.unload isn't used/sufficient
try:
# Gradio's launch() typically blocks, so this part might only run after shutdown
pass
except KeyboardInterrupt:
logger.info("KeyboardInterrupt received, attempting shutdown.")
if client:
print("Closing connections...")
loop.run_until_complete(client.close_connection())
print("Cleanup complete.")
finally:
logger.info("Application shutting down.")