Spaces:
Running
Running
| # -*- coding: utf-8 -*- | |
| """ | |
| MCP Client - Connects to Gradio MCP servers on Hugging Face Spaces | |
| Provides context/skills from MCP to enhance Gemini responses | |
| """ | |
| from gradio_client import Client | |
| from typing import Optional, Dict, Any, List | |
| import json | |
| class MCPClient: | |
| """ | |
| Client for connecting to MCP (Model Context Protocol) servers | |
| hosted on Hugging Face Spaces using Gradio. | |
| MCP provides CONTEXT and SKILLS that ground Gemini's responses, | |
| not a replacement for the LLM. | |
| """ | |
| def __init__(self): | |
| self.client: Optional[Client] = None | |
| self.space_url: str = "" | |
| self.connected: bool = False | |
| self.api_info: Dict[str, Any] = {} | |
| self.available_endpoints: List[Dict[str, Any]] = [] | |
| self.space_name: str = "" | |
| self.mcp_description: str = "" | |
| self.mcp_capabilities: List[str] = [] | |
| def connect(self, space_url: str) -> Dict[str, Any]: | |
| """ | |
| Connect to a Gradio MCP server on Hugging Face Spaces. | |
| Args: | |
| space_url: URL to the HF Space (e.g., "MCP-1st-Birthday/QuantumArchitect-MCP") | |
| Returns: | |
| Dict with connection status and info | |
| """ | |
| try: | |
| # Clean up the URL - extract space name if full URL provided | |
| if "huggingface.co/spaces/" in space_url: | |
| # Extract space name from full URL | |
| parts = space_url.split("huggingface.co/spaces/") | |
| if len(parts) > 1: | |
| space_url = parts[1].rstrip("/") | |
| # Remove any trailing slashes or extra parts | |
| space_url = space_url.split("?")[0].rstrip("/") | |
| self.space_url = space_url | |
| self.space_name = space_url.split("/")[-1] if "/" in space_url else space_url | |
| print(f"π Connecting to MCP: {space_url}") | |
| # Create Gradio client | |
| self.client = Client(space_url) | |
| self.connected = True | |
| # Get API info and parse capabilities | |
| try: | |
| api_str = self.client.view_api(print_info=False, return_format="str") | |
| if api_str: | |
| self.api_info = {"raw": str(api_str)[:2000]} | |
| # Parse endpoints from API info | |
| self._parse_mcp_capabilities(str(api_str)) | |
| except Exception as e: | |
| self.api_info = {"note": f"Could not retrieve API info: {str(e)}"} | |
| return { | |
| "success": True, | |
| "message": f"β Connected to {self.space_name}", | |
| "space_url": space_url, | |
| "capabilities": self.mcp_capabilities, | |
| "description": self.mcp_description | |
| } | |
| except Exception as e: | |
| self.connected = False | |
| self.client = None | |
| error_msg = str(e) | |
| print(f"β MCP connection failed: {error_msg}") | |
| return { | |
| "success": False, | |
| "message": f"β Failed to connect: {error_msg}", | |
| "space_url": space_url | |
| } | |
| def _parse_mcp_capabilities(self, api_str: str): | |
| """Parse MCP capabilities from API info string""" | |
| self.mcp_capabilities = [] | |
| self.available_endpoints = [] | |
| # Extract endpoint names and descriptions | |
| lines = api_str.split('\n') | |
| current_endpoint = None | |
| for line in lines: | |
| # Look for endpoint definitions like "- predict(..., api_name="/handler")" | |
| if 'api_name="/' in line or "api_name='/" in line: | |
| # Extract endpoint name | |
| if 'api_name="/' in line: | |
| start = line.find('api_name="/') + len('api_name="') | |
| end = line.find('"', start) | |
| else: | |
| start = line.find("api_name='/") + len("api_name='") | |
| end = line.find("'", start) | |
| if start > 0 and end > start: | |
| endpoint_name = line[start:end] | |
| self.available_endpoints.append({"name": endpoint_name, "line": line.strip()}) | |
| # Create capability description | |
| capability = f"Endpoint: {endpoint_name}" | |
| self.mcp_capabilities.append(capability) | |
| # Generate description from space name | |
| self.mcp_description = f"MCP Server: {self.space_name} with {len(self.available_endpoints)} endpoints" | |
| def disconnect(self): | |
| """Disconnect from the MCP server""" | |
| self.client = None | |
| self.connected = False | |
| self.space_url = "" | |
| self.space_name = "" | |
| self.api_info = {} | |
| self.available_endpoints = [] | |
| self.mcp_description = "" | |
| self.mcp_capabilities = [] | |
| return {"success": True, "message": "Disconnected from MCP"} | |
| def get_context_for_llm(self, user_message: str) -> str: | |
| """ | |
| Get MCP context to ground the LLM response. | |
| This queries relevant MCP endpoints and returns context for Gemini. | |
| Args: | |
| user_message: The user's message to find relevant context for | |
| Returns: | |
| Context string to prepend to Gemini's system prompt | |
| """ | |
| if not self.connected or not self.client: | |
| return "" | |
| context_parts = [] | |
| # Add MCP description | |
| context_parts.append(f"You have access to the {self.space_name} MCP server.") | |
| # Try to get relevant information from MCP | |
| try: | |
| # Try calling the MCP to get context | |
| mcp_response = self._query_mcp_for_context(user_message) | |
| if mcp_response and not mcp_response.startswith("β"): | |
| context_parts.append(f"\n**MCP Context ({self.space_name}):**\n{mcp_response}") | |
| except Exception as e: | |
| context_parts.append(f"\nMCP available but query failed: {str(e)[:100]}") | |
| # Add available capabilities | |
| if self.mcp_capabilities: | |
| caps = ", ".join(self.mcp_capabilities[:5]) | |
| context_parts.append(f"\nAvailable MCP capabilities: {caps}") | |
| return "\n".join(context_parts) | |
| def _query_mcp_for_context(self, message: str) -> str: | |
| """Query the MCP to get relevant context for the message""" | |
| if not self.client: | |
| return "" | |
| # Try common context/info endpoints first | |
| context_endpoints = [ | |
| "/get_context", | |
| "/info", | |
| "/describe", | |
| "/capabilities", | |
| "/handler", | |
| ] | |
| for ep in context_endpoints: | |
| try: | |
| result = self.client.predict(message, api_name=ep) | |
| if result is not None: | |
| formatted = self._format_result(result) | |
| if formatted and len(formatted) > 10 and not formatted.startswith("β"): | |
| # Truncate if too long | |
| if len(formatted) > 500: | |
| formatted = formatted[:500] + "..." | |
| return formatted | |
| except Exception: | |
| continue | |
| return "" | |
| def call_mcp(self, message: str, endpoint: str = "") -> str: | |
| """ | |
| Call the MCP server with a message. | |
| Args: | |
| message: The user's message to send to the MCP | |
| endpoint: The API endpoint to call (e.g., "/handler" or index like "0") | |
| Returns: | |
| Response from the MCP server | |
| """ | |
| if not self.connected or not self.client: | |
| return "β Not connected to any MCP server. Please connect first." | |
| try: | |
| # Try to find the best endpoint | |
| api_name = endpoint if endpoint else None | |
| # Common MCP handler endpoints to try | |
| endpoints_to_try = [ | |
| "/handler", # Most common for MCP | |
| "/chat", | |
| "/predict", | |
| "/run", | |
| "/call", | |
| "/process", | |
| 0, # First endpoint by index | |
| ] | |
| if api_name: | |
| endpoints_to_try.insert(0, api_name) | |
| last_error = "" | |
| for ep in endpoints_to_try: | |
| try: | |
| # Try calling with different parameter styles | |
| if isinstance(ep, int): | |
| result = self.client.predict(message, fn_index=ep) | |
| else: | |
| # Try with just message | |
| try: | |
| result = self.client.predict(message, api_name=ep) | |
| except Exception: | |
| # Try with message as first param | |
| result = self.client.predict( | |
| message, # As the main input | |
| api_name=ep | |
| ) | |
| if result is not None: | |
| formatted = self._format_result(result) | |
| if formatted and not formatted.startswith("β"): | |
| return formatted | |
| except Exception as e: | |
| last_error = str(e) | |
| continue | |
| # If nothing worked, return helpful message | |
| return f"β Could not call MCP. Try specifying an endpoint.\nAvailable: {self.available_endpoints[:10]}...\nLast error: {last_error[:100]}" | |
| except Exception as e: | |
| return f"β Error calling MCP: {str(e)}" | |
| def _format_result(self, result: Any) -> str: | |
| """Format the MCP result to a string""" | |
| if isinstance(result, str): | |
| return result | |
| elif isinstance(result, dict): | |
| return json.dumps(result, indent=2) | |
| elif isinstance(result, (list, tuple)): | |
| # If it's a tuple/list, try to extract the main content | |
| if len(result) > 0: | |
| main_result = result[0] | |
| if isinstance(main_result, str): | |
| return main_result | |
| return json.dumps(main_result, indent=2) | |
| return str(result) | |
| else: | |
| return str(result) | |
| def get_status(self) -> Dict[str, Any]: | |
| """Get current connection status""" | |
| return { | |
| "connected": self.connected, | |
| "space_url": self.space_url, | |
| "space_name": self.space_name, | |
| "endpoints": self.available_endpoints | |
| } | |
| def list_tools(self) -> str: | |
| """Try to list available tools/endpoints from the MCP""" | |
| if not self.connected or not self.client: | |
| return "Not connected to any MCP server." | |
| try: | |
| # Try to get API info | |
| api_info = self.client.view_api(print_info=False, return_format="str") | |
| return f"π Available API endpoints for {self.space_name}:\n\n{api_info}" | |
| except Exception as e: | |
| return f"Could not retrieve tools: {str(e)}" | |
| # Singleton instance | |
| mcp_client = MCPClient() | |
| # Testing | |
| if __name__ == "__main__": | |
| print("=" * 50) | |
| print("Testing MCP Client") | |
| print("=" * 50) | |
| client = MCPClient() | |
| # Test connection | |
| print("\n1. Testing connection to QuantumArchitect-MCP...") | |
| result = client.connect("https://huggingface.co/spaces/MCP-1st-Birthday/QuantumArchitect-MCP") | |
| print(f" Result: {result}") | |
| if result["success"]: | |
| print("\n2. Getting status...") | |
| status = client.get_status() | |
| print(f" Status: {status}") | |
| print("\n3. Listing tools...") | |
| tools = client.list_tools() | |
| print(f" Tools: {tools[:500]}...") | |
| print("\n4. Testing MCP call...") | |
| response = client.call_mcp("Hello, what can you do?") | |
| print(f" Response: {response[:200]}...") | |
| print("\n5. Disconnecting...") | |
| client.disconnect() | |
| print(f" Connected: {client.connected}") | |
| print("\nβ MCP Client test complete!") | |