Spaces:
Sleeping
Sleeping
import gradio as gr | |
import asyncio | |
import json | |
import os | |
import re | |
import base64 | |
from typing import List, Dict, Any, Optional | |
from dataclasses import dataclass | |
import anthropic | |
from datetime import datetime | |
import logging | |
import traceback | |
# Import the proper MCP client components | |
from mcp import ClientSession | |
from mcp.client.sse import sse_client | |
# Optional import for file upload functionality | |
try: | |
import httpx | |
HTTPX_AVAILABLE = True | |
except ImportError: | |
HTTPX_AVAILABLE = False | |
logging.warning("httpx not available - file upload functionality limited") | |
# Set up enhanced logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
class MCPServerConfig: | |
name: str | |
url: str | |
description: str | |
space_id: Optional[str] = None | |
class UniversalMCPClient: | |
def __init__(self): | |
self.servers: Dict[str, MCPServerConfig] = {} | |
self.anthropic_client = None | |
# Initialize Anthropic client if API key is available | |
if os.getenv("ANTHROPIC_API_KEY"): | |
self.anthropic_client = anthropic.Anthropic( | |
api_key=os.getenv("ANTHROPIC_API_KEY") | |
) | |
logger.info("β Anthropic client initialized") | |
else: | |
logger.warning("β οΈ ANTHROPIC_API_KEY not found") | |
async def add_server_async(self, config: MCPServerConfig) -> tuple[bool, str]: | |
"""Add an MCP server using pure MCP protocol""" | |
try: | |
logger.info(f"π§ Adding MCP server: {config.name} at {config.url}") | |
# Clean and validate URL - handle various input formats | |
original_url = config.url.strip() | |
# Remove common MCP endpoint variations | |
base_url = original_url | |
for endpoint in ["/gradio_api/mcp/sse", "/gradio_api/mcp/", "/gradio_api/mcp"]: | |
if base_url.endswith(endpoint): | |
base_url = base_url[:-len(endpoint)] | |
break | |
# Remove trailing slashes | |
base_url = base_url.rstrip("/") | |
# Construct proper MCP URL | |
mcp_url = f"{base_url}/gradio_api/mcp/sse" | |
logger.info(f"π§ Original URL: {original_url}") | |
logger.info(f"π§ Base URL: {base_url}") | |
logger.info(f"π§ MCP URL: {mcp_url}") | |
# Extract space ID if it's a HuggingFace space | |
if "hf.space" in base_url: | |
space_parts = base_url.split("/") | |
if len(space_parts) >= 1: | |
space_id = space_parts[-1].replace('.hf.space', '').replace('https://', '').replace('http://', '') | |
if '-' in space_id: | |
# Format: username-spacename.hf.space | |
config.space_id = space_id.replace('-', '/', 1) | |
else: | |
config.space_id = space_id | |
logger.info(f"π Detected HF Space ID: {config.space_id}") | |
# Update config with proper MCP URL | |
config.url = mcp_url | |
# Test MCP connection | |
success, message = await self._test_mcp_connection(config) | |
if success: | |
self.servers[config.name] = config | |
logger.info(f"β MCP Server {config.name} added successfully") | |
return True, f"β Successfully added MCP server: {config.name}\n{message}" | |
else: | |
logger.error(f"β Failed to connect to MCP server {config.name}: {message}") | |
return False, f"β Failed to add server: {config.name}\n{message}" | |
except Exception as e: | |
error_msg = f"Failed to add server {config.name}: {str(e)}" | |
logger.error(error_msg) | |
logger.error(traceback.format_exc()) | |
return False, f"β {error_msg}" | |
async def _test_mcp_connection(self, config: MCPServerConfig) -> tuple[bool, str]: | |
"""Test MCP server connection with detailed debugging""" | |
try: | |
logger.info(f"π Testing MCP connection to {config.url}") | |
timeout_seconds = 20.0 | |
async with sse_client(config.url, timeout=timeout_seconds) as (read_stream, write_stream): | |
async with ClientSession(read_stream, write_stream) as session: | |
# Initialize MCP session | |
logger.info("π§ Initializing MCP session...") | |
await session.initialize() | |
# List available tools | |
logger.info("π Listing available tools...") | |
tools = await session.list_tools() | |
tool_info = [] | |
for tool in tools.tools: | |
tool_info.append(f" - {tool.name}: {tool.description}") | |
logger.info(f" π Tool: {tool.name}") | |
logger.info(f" Description: {tool.description}") | |
if hasattr(tool, 'inputSchema') and tool.inputSchema: | |
logger.info(f" Input Schema: {tool.inputSchema}") | |
if len(tools.tools) == 0: | |
return False, "No tools found on MCP server" | |
message = f"Connected successfully!\nFound {len(tools.tools)} tools:\n" + "\n".join(tool_info) | |
return True, message | |
except asyncio.TimeoutError: | |
return False, "Connection timeout - server may be sleeping or unreachable" | |
except Exception as e: | |
logger.error(f"MCP connection failed: {e}") | |
logger.error(traceback.format_exc()) | |
return False, f"Connection failed: {str(e)}" | |
def _extract_media_from_mcp_response(self, result_text: str, config: MCPServerConfig) -> Optional[str]: | |
"""Enhanced media extraction from MCP responses""" | |
if not isinstance(result_text, str): | |
logger.info(f"π Non-string result: {type(result_text)}") | |
return None | |
base_url = config.url.replace("/gradio_api/mcp/sse", "") | |
logger.info(f"π Processing MCP result for media: {result_text[:300]}...") | |
logger.info(f"π Base URL: {base_url}") | |
# 1. Try to parse as JSON (most Gradio MCP servers return structured data) | |
try: | |
if result_text.strip().startswith('[') or result_text.strip().startswith('{'): | |
logger.info("π Attempting JSON parse...") | |
data = json.loads(result_text.strip()) | |
logger.info(f"π Parsed JSON structure: {data}") | |
# Handle array format: [{'image': {'url': '...'}}] or [{'url': '...'}] | |
if isinstance(data, list) and len(data) > 0: | |
item = data[0] | |
logger.info(f"π First array item: {item}") | |
if isinstance(item, dict): | |
# Check for nested media structure | |
for media_type in ['image', 'audio', 'video']: | |
if media_type in item and isinstance(item[media_type], dict): | |
media_data = item[media_type] | |
if 'url' in media_data: | |
url = media_data['url'] | |
logger.info(f"π― Found {media_type} URL: {url}") | |
return self._resolve_media_url(url, base_url) | |
# Check for direct URL | |
if 'url' in item: | |
url = item['url'] | |
logger.info(f"π― Found direct URL: {url}") | |
return self._resolve_media_url(url, base_url) | |
# Handle object format: {'image': {'url': '...'}} or {'url': '...'} | |
elif isinstance(data, dict): | |
logger.info(f"π Processing dict: {data}") | |
# Check for nested media structure | |
for media_type in ['image', 'audio', 'video']: | |
if media_type in data and isinstance(data[media_type], dict): | |
media_data = data[media_type] | |
if 'url' in media_data: | |
url = media_data['url'] | |
logger.info(f"π― Found {media_type} URL: {url}") | |
return self._resolve_media_url(url, base_url) | |
# Check for direct URL | |
if 'url' in data: | |
url = data['url'] | |
logger.info(f"π― Found direct URL: {url}") | |
return self._resolve_media_url(url, base_url) | |
except json.JSONDecodeError: | |
logger.info("π Not valid JSON, trying other formats...") | |
except Exception as e: | |
logger.warning(f"π JSON parsing error: {e}") | |
# 2. Check for data URLs (base64 encoded media) | |
if result_text.startswith('data:'): | |
logger.info("π― Found data URL") | |
return result_text | |
# 3. Check for base64 image patterns | |
if any(result_text.startswith(pattern) for pattern in ['iVBORw0KGgoAAAANSUhEU', '/9j/', 'UklGR']): | |
logger.info("π― Found base64 image data") | |
return f"data:image/png;base64,{result_text}" | |
# 4. Check for file paths and convert to URLs | |
media_extensions = ['.png', '.jpg', '.jpeg', '.gif', '.webp', '.mp3', '.wav', '.ogg', '.m4a', '.flac', '.mp4', '.avi', '.mov'] | |
if any(ext in result_text.lower() for ext in media_extensions): | |
# Extract just the filename if it's a path | |
if '/' in result_text: | |
filename = result_text.split('/')[-1] | |
else: | |
filename = result_text.strip() | |
# Create Gradio file URL | |
if filename.startswith('http'): | |
media_url = filename | |
else: | |
media_url = f"{base_url}/file={filename}" | |
logger.info(f"π― Found media file: {media_url}") | |
return media_url | |
# 5. Check for HTTP URLs that look like media | |
if result_text.startswith('http') and any(ext in result_text.lower() for ext in media_extensions): | |
logger.info(f"π― Found HTTP media URL: {result_text}") | |
return result_text | |
logger.info("β No media detected in result") | |
return None | |
def _resolve_media_url(self, url: str, base_url: str) -> str: | |
"""Resolve relative URLs to absolute URLs""" | |
if url.startswith('http') or url.startswith('data:'): | |
return url | |
elif url.startswith('/'): | |
return f"{base_url}/file={url}" | |
else: | |
return f"{base_url}/file={url}" | |
def _convert_file_to_accessible_url(self, file_path: str, base_url: str) -> str: | |
"""Convert local file path to accessible URL for MCP servers""" | |
try: | |
# Extract filename | |
filename = file_path.split('/')[-1] if '/' in file_path else file_path | |
# For Gradio MCP servers, we can use the /file= endpoint | |
# This assumes the MCP server can access the same file system or we upload it | |
accessible_url = f"{base_url}/file={filename}" | |
logger.info(f"π Converted file path to accessible URL: {accessible_url}") | |
return accessible_url | |
except Exception as e: | |
logger.error(f"Failed to convert file to accessible URL: {e}") | |
return file_path # Fallback to original path | |
async def upload_file_to_gradio_server(self, file_path: str, target_server_url: str) -> Optional[str]: | |
"""Upload a local file to a Gradio server and return the accessible URL""" | |
if not HTTPX_AVAILABLE: | |
logger.error("httpx not available for file upload") | |
return None | |
try: | |
import httpx | |
# Remove MCP endpoint to get base URL | |
base_url = target_server_url.replace("/gradio_api/mcp/sse", "") | |
upload_url = f"{base_url}/upload" | |
# Read the file | |
with open(file_path, "rb") as f: | |
file_content = f.read() | |
# Get filename | |
filename = file_path.split('/')[-1] if '/' in file_path else file_path | |
# Upload file to Gradio server | |
files = {"file": (filename, file_content)} | |
async with httpx.AsyncClient() as client: | |
response = await client.post(upload_url, files=files, timeout=30.0) | |
if response.status_code == 200: | |
# Gradio usually returns the file path/URL in the response | |
result = response.json() | |
if isinstance(result, list) and len(result) > 0: | |
uploaded_path = result[0] | |
# Convert to accessible URL | |
accessible_url = f"{base_url}/file={uploaded_path}" | |
logger.info(f"π€ Successfully uploaded file: {accessible_url}") | |
return accessible_url | |
logger.warning(f"File upload failed with status {response.status_code}") | |
return None | |
except Exception as e: | |
logger.error(f"Failed to upload file to Gradio server: {e}") | |
return None | |
def _check_file_upload_compatibility(self, config: MCPServerConfig) -> str: | |
"""Check if a server likely supports file uploads""" | |
if "hf.space" in config.url: | |
return "π‘ Hugging Face Space (usually compatible)" | |
elif "gradio" in config.url.lower(): | |
return "π’ Gradio server (likely compatible)" | |
elif "localhost" in config.url or "127.0.0.1" in config.url: | |
return "π’ Local server (file access available)" | |
else: | |
return "π΄ Remote server (may need public URLs)" | |
def get_server_status(self) -> Dict[str, str]: | |
"""Get status of all configured servers""" | |
status = {} | |
for name in self.servers: | |
compatibility = self._check_file_upload_compatibility(self.servers[name]) | |
status[name] = f"β Connected (MCP Protocol) - {compatibility}" | |
return status | |
# Global MCP client instance | |
mcp_client = UniversalMCPClient() | |
def chat_with_mcp(message: Dict[str, Any], history: List[Dict[str, Any]]) -> tuple[List[Dict[str, Any]], Dict[str, Any]]: | |
"""Enhanced MCP chat function with multimodal input support""" | |
if not mcp_client.anthropic_client: | |
error_msg = "β Anthropic API key not configured. Please set ANTHROPIC_API_KEY environment variable." | |
history.append({"role": "user", "content": error_msg}) | |
history.append({"role": "assistant", "content": error_msg}) | |
return history, gr.MultimodalTextbox(value=None, interactive=False) | |
# Initialize variables for error handling | |
user_text = "" | |
user_files = [] | |
try: | |
# Handle multimodal input - message is a dict with 'text' and 'files' | |
user_text = message.get("text", "") if message else "" | |
user_files = message.get("files", []) if message else [] | |
# Handle case where message might be a string (backward compatibility) | |
if isinstance(message, str): | |
user_text = message | |
user_files = [] | |
logger.info(f"π¬ Processing multimodal message:") | |
logger.info(f" π Text: {user_text}") | |
logger.info(f" π Files: {len(user_files)} files uploaded") | |
# Add uploaded files to chat history first | |
for file_path in user_files: | |
logger.info(f" π File: {file_path}") | |
history.append({"role": "user", "content": {"path": file_path}}) | |
# Add text message if provided | |
if user_text and user_text.strip(): | |
history.append({"role": "user", "content": user_text}) | |
# If no text and no files, return early | |
if not user_text.strip() and not user_files: | |
return history, gr.MultimodalTextbox(value=None, interactive=False) | |
# Create messages for Claude API | |
messages = [] | |
# Convert history to Claude API format (text only for context) | |
recent_history = history[-16:] if len(history) > 16 else history | |
for msg in recent_history: | |
if msg.get("role") in ["user", "assistant"]: | |
content = msg.get("content", "") | |
# Convert any non-string content to string description for context | |
if isinstance(content, dict): | |
if "path" in content: | |
file_path = content.get('path', 'unknown') | |
# Determine file type for context | |
if any(ext in file_path.lower() for ext in ['.png', '.jpg', '.jpeg', '.gif', '.webp']): | |
content = f"[User uploaded an image: {file_path}]" | |
elif any(ext in file_path.lower() for ext in ['.mp3', '.wav', '.ogg', '.m4a', '.flac']): | |
content = f"[User uploaded an audio file: {file_path}]" | |
elif any(ext in file_path.lower() for ext in ['.mp4', '.avi', '.mov']): | |
content = f"[User uploaded a video file: {file_path}]" | |
else: | |
content = f"[User uploaded a file: {file_path}]" | |
else: | |
content = f"[Object: {str(content)[:50]}...]" | |
elif isinstance(content, (list, tuple)): | |
content = f"[List: {str(content)[:50]}...]" | |
elif content is None: | |
content = "[Empty]" | |
else: | |
content = str(content) | |
messages.append({ | |
"role": msg["role"], | |
"content": content | |
}) | |
# Check if we have MCP servers to use | |
if not mcp_client.servers: | |
# No MCP servers - use regular Claude API for simple chat | |
logger.info("π¬ No MCP servers available, using regular Claude chat") | |
system_prompt = f"""You are Claude Sonnet 4, a helpful AI assistant with native multimodal capabilities. You can have conversations, answer questions, help with various tasks, and provide information on a wide range of topics. | |
YOUR NATIVE CAPABILITIES (Available right now): | |
- **Image Understanding**: You can directly see and describe images, analyze their content, read text in images, identify objects, people, scenes, etc. | |
- **Text Processing**: You can analyze, summarize, translate, and process text directly | |
- **General Knowledge**: You can answer questions, explain concepts, and have conversations | |
- **Code Analysis**: You can read, analyze, and explain code | |
Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
IMPORTANT: You DO NOT need MCP servers for: | |
- Describing or analyzing uploaded images | |
- Reading text in images | |
- Identifying objects, people, or scenes in images | |
- General conversation and knowledge questions | |
You DO need MCP servers for: | |
- Creating new images, audio, or video | |
- Editing or transforming existing media files | |
- Transcribing audio files | |
- Processing non-image files (audio, video, documents) | |
If users upload images and ask you to describe or analyze them, use your native vision capabilities immediately. Only mention MCP servers if they ask for creation or editing tasks.""" | |
# Use regular messages API | |
response = mcp_client.anthropic_client.messages.create( | |
model="claude-sonnet-4-20250514", | |
max_tokens=2048, | |
system=system_prompt, | |
messages=messages | |
) | |
else: | |
# We have MCP servers - use the MCP connector API | |
mcp_servers = [] | |
for server_name, config in mcp_client.servers.items(): | |
mcp_servers.append({ | |
"type": "url", | |
"url": config.url, | |
"name": server_name.replace(" ", "_").lower() | |
}) | |
# Enhanced system prompt with multimodal and MCP instructions | |
uploaded_files_context = "" | |
if user_files: | |
uploaded_files_context = f"\n\nFILES UPLOADED BY USER:\n" | |
for i, file_path in enumerate(user_files, 1): | |
file_name = file_path.split('/')[-1] if '/' in file_path else file_path | |
if any(ext in file_path.lower() for ext in ['.png', '.jpg', '.jpeg', '.gif', '.webp']): | |
file_type = "Image" | |
elif any(ext in file_path.lower() for ext in ['.mp3', '.wav', '.ogg', '.m4a', '.flac']): | |
file_type = "Audio" | |
elif any(ext in file_path.lower() for ext in ['.mp4', '.avi', '.mov']): | |
file_type = "Video" | |
else: | |
file_type = "File" | |
uploaded_files_context += f"{i}. {file_type}: {file_name} (path: {file_path})\n" | |
# Enhanced system prompt with Claude's native capabilities and MCP usage | |
system_prompt = f"""You are Claude Sonnet 4, a helpful AI assistant with both native multimodal capabilities and access to various MCP tools. | |
YOUR NATIVE CAPABILITIES (No MCP tools needed): | |
- **Image Understanding**: You can directly see and describe images, analyze their content, read text in images, etc. | |
- **Text Processing**: You can analyze, summarize, translate, and process text directly | |
- **General Knowledge**: You can answer questions, explain concepts, and have conversations | |
- **Code Analysis**: You can read, analyze, and explain code | |
WHEN TO USE MCP TOOLS: | |
- **Image Generation**: Creating new images from text prompts | |
- **Image Editing**: Modifying, enhancing, or transforming existing images | |
- **Audio Processing**: Transcribing audio, generating speech, audio enhancement | |
- **Video Processing**: Creating or editing videos | |
- **Specialized Analysis**: Tasks requiring specific models or APIs | |
UPLOADED FILES HANDLING: | |
{uploaded_files_context} | |
IMPORTANT - For uploaded images: | |
- **Image Description/Analysis**: Use your NATIVE vision capabilities - you can see and describe images directly | |
- **Image Editing/Enhancement**: Use MCP image processing tools | |
- **Image Generation**: Use MCP image generation tools | |
IMPORTANT - File URL Conversion for MCP Tools: | |
When using MCP tools that require file inputs, you need to be aware that uploaded files have local paths that remote MCP servers cannot access. | |
For uploaded files in MCP tool calls: | |
- If an MCP tool fails with "Invalid file data format" or similar errors about file paths | |
- The issue is that remote MCP servers cannot access local file paths like '/tmp/gradio/...' | |
- In such cases, inform the user that the MCP server requires files to be accessible via public URLs | |
- Suggest that they need a "File Upload" MCP server or that the specific MCP server may need configuration for file handling | |
Current uploaded files that may need URL conversion: | |
{uploaded_files_context} | |
IMPORTANT - GRADIO MEDIA DISPLAY: | |
When MCP tools return media, end your response with "MEDIA_GENERATED: [URL]" where [URL] is the actual media URL. | |
Examples: | |
- User uploads image + "What's in this image?" β Use NATIVE vision (no MCP needed) | |
- User uploads image + "Make this vintage" β Use MCP image editing tool | |
- User says "Generate a sunset image" β Use MCP image generation tool | |
- User uploads audio + "Transcribe this" β Use MCP transcription tool | |
Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
Available MCP servers: {list(mcp_client.servers.keys())}""" | |
# Debug logging | |
logger.info(f"π€ Sending {len(messages)} messages to Claude API") | |
logger.info(f"π§ Using {len(mcp_servers)} MCP servers") | |
# Call Claude with MCP connector using the correct beta API | |
response = mcp_client.anthropic_client.beta.messages.create( | |
model="claude-sonnet-4-20250514", | |
max_tokens=2048, | |
system=system_prompt, | |
messages=messages, | |
mcp_servers=mcp_servers, | |
betas=["mcp-client-2025-04-04"] | |
) | |
response_text = "" | |
media_url = None | |
current_server_name = None # Track the current server for tool results | |
# Process Claude's response | |
for content in response.content: | |
if content.type == "text": | |
response_text += content.text | |
# Check if Claude indicated media was generated | |
if "MEDIA_GENERATED:" in content.text: | |
media_match = re.search(r"MEDIA_GENERATED:\s*([^\s]+)", content.text) | |
if media_match: | |
media_url = media_match.group(1) | |
# Clean up the response text | |
response_text = re.sub(r"MEDIA_GENERATED:\s*[^\s]+", "", response_text).strip() | |
logger.info(f"π― Claude indicated media generated: {media_url}") | |
elif hasattr(content, 'type') and content.type == "mcp_tool_use": | |
tool_name = content.name | |
server_name = content.server_name | |
current_server_name = server_name # Remember for the result | |
logger.info(f"π§ Claude used MCP tool: {tool_name} on server: {server_name}") | |
response_text += f"\n\nπ§ Used {tool_name} successfully!" | |
elif hasattr(content, 'type') and content.type == "mcp_tool_result": | |
# mcp_tool_result blocks don't have server_name, but we can use the last one | |
tool_use_id = getattr(content, 'tool_use_id', 'unknown') | |
logger.info(f"π Processing MCP tool result (tool_use_id: {tool_use_id})") | |
if content.content: | |
result_content = content.content[0] | |
result_text = result_content.text if hasattr(result_content, 'text') else str(result_content) | |
logger.info(f"π MCP tool result: {result_text[:200]}...") | |
response_text += f"\n\n**Result**: {result_text}" | |
# Try to extract media from the result using the current server | |
if current_server_name and current_server_name in mcp_client.servers: | |
config = mcp_client.servers[current_server_name] | |
extracted_media = mcp_client._extract_media_from_mcp_response(result_text, config) | |
if extracted_media: | |
media_url = extracted_media | |
logger.info(f"π― Extracted media from MCP result: {media_url}") | |
else: | |
# Fallback: try all servers to find media | |
for server_name, config in mcp_client.servers.items(): | |
extracted_media = mcp_client._extract_media_from_mcp_response(result_text, config) | |
if extracted_media: | |
media_url = extracted_media | |
logger.info(f"π― Extracted media from MCP result (fallback): {media_url}") | |
break | |
else: | |
response_text += f"\n\nβ Tool call failed: No content returned" | |
if not response_text: | |
response_text = "I understand your request and I'm here to help." | |
# Add assistant response to history | |
history.append({"role": "assistant", "content": response_text}) | |
# Add media as separate message if we have it | |
if media_url: | |
logger.info(f"π¨ Adding media to chat: {media_url}") | |
history.append({"role": "assistant", "content": {"path": media_url}}) | |
return history, gr.MultimodalTextbox(value=None, interactive=False) | |
except Exception as e: | |
error_msg = f"β Error: {str(e)}" | |
logger.error(f"Chat error: {e}") | |
logger.error(traceback.format_exc()) | |
# Add user input to history if it exists | |
if user_text and user_text.strip(): | |
history.append({"role": "user", "content": user_text}) | |
if user_files: | |
for file_path in user_files: | |
history.append({"role": "user", "content": {"path": file_path}}) | |
history.append({"role": "assistant", "content": error_msg}) | |
return history, gr.MultimodalTextbox(value=None, interactive=False) | |
def convert_hf_space_to_url(space_name: str) -> str: | |
""" | |
Convert HuggingFace space name to proper URL format. | |
HuggingFace URL rules: | |
- Replace "/" with "-" | |
- Convert to lowercase | |
- Replace dots and other special chars with "-" | |
- Remove consecutive hyphens | |
""" | |
if "/" not in space_name: | |
raise ValueError("Space name should be in format: username/space-name") | |
# Replace "/" with "-" | |
url_name = space_name.replace("/", "-") | |
# Convert to lowercase | |
url_name = url_name.lower() | |
# Replace dots and other special characters with hyphens | |
import re | |
url_name = re.sub(r'[^a-z0-9\-]', '-', url_name) | |
# Remove consecutive hyphens | |
url_name = re.sub(r'-+', '-', url_name) | |
# Remove leading/trailing hyphens | |
url_name = url_name.strip('-') | |
return f"https://{url_name}.hf.space" | |
def add_custom_server(name: str, space_name: str) -> tuple[str, str]: | |
"""Add a custom MCP server from HuggingFace space name""" | |
logger.info(f"β Adding MCP server: {name} from space: {space_name}") | |
if not name or not space_name: | |
return "β Please provide both server name and space name", "" | |
space_name = space_name.strip() | |
try: | |
# Use the improved URL conversion | |
mcp_url = convert_hf_space_to_url(space_name) | |
logger.info(f"π Converted {space_name} β {mcp_url}") | |
except ValueError as e: | |
return f"β {str(e)}", "" | |
config = MCPServerConfig( | |
name=name.strip(), | |
url=mcp_url, | |
description=f"MCP server from HuggingFace space: {space_name}", | |
space_id=space_name | |
) | |
try: | |
# Run async function | |
def run_async(): | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
try: | |
return loop.run_until_complete(mcp_client.add_server_async(config)) | |
finally: | |
loop.close() | |
success, message = run_async() | |
logger.info(f"Server addition result: {success} - {message}") | |
if success: | |
# Format success message for accordion display | |
tools_info = "" | |
if 'Found' in message and 'tools:' in message: | |
tools_section = message.split('Found')[1] | |
tools_info = f"**Available Tools:**\n{tools_section}" | |
details_html = f""" | |
<details style="margin-top: 10px;"> | |
<summary style="cursor: pointer; padding: 8px; background: #f0f0f0; border-radius: 4px;"><strong>β {name} - Connection Details</strong></summary> | |
<div style="padding: 10px; border-left: 3px solid #28a745; margin-left: 10px; margin-top: 5px;"> | |
<p><strong>Space:</strong> {space_name}</p> | |
<p><strong>Base URL:</strong> {mcp_url}</p> | |
<p><strong>Status:</strong> Connected successfully!</p> | |
<div style="margin-top: 10px;"> | |
{tools_info.replace('**', '<strong>').replace('**', '</strong>').replace(chr(10), '<br>')} | |
</div> | |
</div> | |
</details> | |
""" | |
return "β Server added successfully!", details_html | |
else: | |
error_html = f""" | |
<details style="margin-top: 10px;"> | |
<summary style="cursor: pointer; padding: 8px; background: #f8d7da; border-radius: 4px;"><strong>β {name} - Connection Failed</strong></summary> | |
<div style="padding: 10px; border-left: 3px solid #dc3545; margin-left: 10px; margin-top: 5px;"> | |
<p>{message}</p> | |
</div> | |
</details> | |
""" | |
return f"β Failed to add server: {name}", error_html | |
except Exception as e: | |
error_msg = f"β Failed to add server: {str(e)}" | |
logger.error(error_msg) | |
logger.error(traceback.format_exc()) | |
return error_msg, "" | |
def get_server_status() -> tuple[str, str]: | |
"""Get status of all servers in accordion format""" | |
try: | |
status = mcp_client.get_server_status() | |
server_count = f"**Total MCP Servers**: {len(status)}" | |
if not status: | |
return server_count, "<p><em>No MCP servers configured yet.</em></p>" | |
accordion_html = "" | |
for name, state in status.items(): | |
server_config = mcp_client.servers[name] | |
base_url = server_config.url.replace("/gradio_api/mcp/sse", "") | |
# Determine health status | |
health = "π’ Healthy" if "β Connected" in state else "π΄ Unhealthy" | |
accordion_html += f""" | |
<details style="margin-bottom: 10px;"> | |
<summary style="cursor: pointer; padding: 8px; background: #e9ecef; border-radius: 4px;"><strong>π§ {name}</strong></summary> | |
<div style="padding: 10px; border-left: 3px solid #007bff; margin-left: 10px; margin-top: 5px;"> | |
<p><strong>Title:</strong> {name}</p> | |
<p><strong>Status:</strong> Connected (MCP Protocol)</p> | |
<p><strong>Health:</strong> {health}</p> | |
<p><strong>Base URL:</strong> {base_url}</p> | |
</div> | |
</details> | |
""" | |
return server_count, accordion_html | |
except Exception as e: | |
return "**Total MCP Servers**: 0", f"<p style='color: red;'>β Error getting status: {str(e)}</p>" | |
# Create Gradio Interface | |
def create_interface(): | |
# Custom CSS for better layout | |
custom_css = """ | |
/* Hide Gradio footer */ | |
footer { | |
display: none !important; | |
} | |
/* Make chatbot expand to fill available space */ | |
.gradio-container { | |
height: 100vh !important; | |
} | |
/* Ensure proper flex layout */ | |
.main-content { | |
display: flex; | |
flex-direction: column; | |
height: 100%; | |
} | |
/* Input area stays at bottom with minimal padding */ | |
.input-area { | |
margin-top: auto; | |
padding-top: 0.25rem !important; | |
padding-bottom: 0 !important; | |
margin-bottom: 0 !important; | |
} | |
/* Reduce padding around chatbot */ | |
.chatbot { | |
margin-bottom: 0 !important; | |
padding-bottom: 0 !important; | |
} | |
""" | |
with gr.Blocks( | |
title="Universal MCP Client", | |
theme=gr.themes.Citrus(), | |
fill_height=True, | |
css=custom_css | |
) as demo: | |
# Sidebar with relevant information | |
with gr.Sidebar(): | |
gr.Markdown("# Gradio.chat.app") | |
# Collapsible information section | |
with gr.Accordion("π Guide & Info", open=True): | |
gr.Markdown(""" | |
## β Quick Start | |
**Native Capabilities:** | |
- ποΈ **Image Understanding**: Upload & ask "What's in this?" | |
- π¬ **Chat**: All conversation capabilities | |
- π§ **Analysis**: Code, text, documents | |
**MCP Servers:** | |
- π¨ **Generate**: Images, audio, content | |
- β‘ **Process**: Files via connected servers | |
- π§ **Edit**: Transform existing media | |
""") | |
gr.Markdown(""" | |
## π― How It Works | |
1. **Direct Tasks**: Claude handles image analysis instantly | |
2. **Generation**: MCP servers create new content | |
3. **File Processing**: Server-dependent compatibility | |
## π File Support | |
- **Images**: PNG, JPG, GIF, WebP | |
- **Audio**: MP3, WAV, M4A, FLAC | |
- **Video**: MP4, AVI, MOV | |
- **Documents**: PDF, TXT, DOCX | |
""") | |
# Server status (not in accordion) - make it reactive | |
gr.Markdown("## π§ Server Status") | |
server_count_display = gr.Markdown(f"**Connected Servers**: {len(mcp_client.servers)}") | |
if mcp_client.servers: | |
server_list = "\n".join([f"β’ **{name}**" for name in mcp_client.servers.keys()]) | |
server_list_display = gr.Markdown(server_list) | |
else: | |
server_list_display = gr.Markdown("*No servers connected*\n\nAdd servers below.") | |
# Server management in accordion | |
with gr.Accordion("βοΈ Manage Servers", open=False): | |
gr.Markdown("### Add MCP Server") | |
server_name = gr.Textbox( | |
label="Server Title", | |
placeholder="Text to Image Generator" | |
) | |
space_name = gr.Textbox( | |
label="HuggingFace Space Name", | |
placeholder="ysharma/dalle-3-xl-lora-v2" | |
) | |
add_server_btn = gr.Button("Add Server", variant="primary") | |
add_server_output = gr.Textbox(label="Status", interactive=False) | |
add_server_details = gr.HTML(label="Details") | |
status_btn = gr.Button("Refresh Status", variant="secondary") | |
status_count = gr.Markdown("**Total MCP Servers**: 0") | |
status_output = gr.HTML() | |
# Main chat area - full height | |
with gr.Column(elem_classes="main-content"): | |
# Chatbot takes most of the space | |
chatbot = gr.Chatbot( | |
label="Universal MCP-Powered Multimodal Chatbot", | |
show_label=False, | |
type="messages", | |
scale=1, # Expand to fill available space | |
show_copy_button=True, | |
avatar_images=None | |
) | |
# Input area at bottom - fixed size | |
with gr.Column(scale=0, elem_classes="input-area"): | |
chat_input = gr.MultimodalTextbox( | |
interactive=True, | |
file_count="multiple", | |
placeholder="Enter message or upload files (images, audio, video, documents)...", | |
show_label=False, | |
sources=["upload", "microphone"], | |
file_types=None # Accept all file types | |
) | |
# Event handlers for multimodal chat | |
def submit_message(message, history): | |
if message and (message.get("text", "").strip() or message.get("files", [])): | |
new_history, cleared_input = chat_with_mcp(message, history) | |
return new_history, cleared_input | |
return history, gr.MultimodalTextbox(value=None, interactive=False) | |
def enable_input(): | |
return gr.MultimodalTextbox(interactive=True) | |
def update_server_display(): | |
"""Update the server status display in sidebar""" | |
server_count = len(mcp_client.servers) | |
count_text = f"**Connected Servers**: {server_count}" | |
if mcp_client.servers: | |
server_list = "\n".join([f"β’ **{name}**" for name in mcp_client.servers.keys()]) | |
return count_text, server_list | |
else: | |
return count_text, "*No servers connected*\n\nAdd servers below." | |
def handle_add_server(name, space_name): | |
"""Handle adding a server and update displays""" | |
status_msg, details_html = add_custom_server(name, space_name) | |
# Update sidebar server display | |
count_text, list_text = update_server_display() | |
return status_msg, details_html, count_text, list_text, "", "" # Clear inputs | |
def handle_refresh_status(): | |
"""Handle refresh status button""" | |
count_text, accordions_html = get_server_status() | |
return count_text, accordions_html | |
# Set up the chat flow - using built-in submit functionality | |
chat_msg_enter = chat_input.submit( | |
submit_message, | |
inputs=[chat_input, chatbot], | |
outputs=[chatbot, chat_input] | |
) | |
chat_msg_enter.then(enable_input, None, [chat_input]) | |
# Server management functionality | |
add_server_btn.click( | |
handle_add_server, | |
inputs=[server_name, space_name], | |
outputs=[add_server_output, add_server_details, server_count_display, server_list_display, server_name, space_name] | |
) | |
status_btn.click( | |
handle_refresh_status, | |
outputs=[status_count, status_output] | |
) | |
return demo | |
if __name__ == "__main__": | |
logger.info("π Starting Universal Multimodal MCP Chatbot Client...") | |
demo = create_interface() | |
demo.launch(debug=True) | |
logger.info("β Universal Multimodal MCP Chatbot Client started successfully!") | |