#!/usr/bin/env python3 """ MCP Server with sample tools for Hugging Face Spaces deployment """ import asyncio import json import logging import sys from typing import Any, Dict, List, Optional, Union from datetime import datetime import os # MCP Server implementation class MCPServer: def __init__(self, name: str = "hf-mcp-server", version: str = "1.0.0"): self.name = name self.version = version self.tools = {} self.resources = {} self.setup_logging() def setup_logging(self): logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) self.logger = logging.getLogger(self.name) def tool(self, name: str, description: str = "", parameters: Dict = None): """Decorator to register tools""" def decorator(func): self.tools[name] = { "function": func, "description": description, "parameters": parameters or {} } return func return decorator def resource(self, uri: str, name: str = "", description: str = ""): """Decorator to register resources""" def decorator(func): self.resources[uri] = { "function": func, "name": name, "description": description } return func return decorator async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]: """Handle incoming MCP requests""" try: method = request.get("method") params = request.get("params", {}) request_id = request.get("id", 1) if method == "initialize": return await self.handle_initialize(request_id, params) elif method == "tools/list": return await self.handle_list_tools(request_id) elif method == "tools/call": return await self.handle_call_tool(request_id, params) elif method == "resources/list": return await self.handle_list_resources(request_id) elif method == "resources/read": return await self.handle_read_resource(request_id, params) else: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32601, "message": f"Method not found: {method}" } } except Exception as e: self.logger.error(f"Error handling request: {e}") return { "jsonrpc": "2.0", "id": request.get("id", 1), "error": { "code": -32603, "message": f"Internal error: {str(e)}" } } async def handle_initialize(self, request_id: int, params: Dict) -> Dict: """Handle initialization request""" return { "jsonrpc": "2.0", "id": request_id, "result": { "protocolVersion": "2024-11-05", "capabilities": { "tools": {}, "resources": {} }, "serverInfo": { "name": self.name, "version": self.version } } } async def handle_list_tools(self, request_id: int) -> Dict: """Handle tools list request""" tools_list = [] for name, tool_info in self.tools.items(): tools_list.append({ "name": name, "description": tool_info["description"], "inputSchema": { "type": "object", "properties": tool_info["parameters"], "required": list(tool_info["parameters"].keys()) if tool_info["parameters"] else [] } }) return { "jsonrpc": "2.0", "id": request_id, "result": { "tools": tools_list } } async def handle_call_tool(self, request_id: int, params: Dict) -> Dict: """Handle tool call request""" tool_name = params.get("name") arguments = params.get("arguments", {}) if tool_name not in self.tools: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32602, "message": f"Tool not found: {tool_name}" } } try: tool_func = self.tools[tool_name]["function"] result = await tool_func(**arguments) return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": str(result) } ] } } except Exception as e: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32603, "message": f"Tool execution error: {str(e)}" } } async def handle_list_resources(self, request_id: int) -> Dict: """Handle resources list request""" resources_list = [] for uri, resource_info in self.resources.items(): resources_list.append({ "uri": uri, "name": resource_info["name"], "description": resource_info["description"], "mimeType": "text/plain" }) return { "jsonrpc": "2.0", "id": request_id, "result": { "resources": resources_list } } async def handle_read_resource(self, request_id: int, params: Dict) -> Dict: """Handle resource read request""" uri = params.get("uri") if uri not in self.resources: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32602, "message": f"Resource not found: {uri}" } } try: resource_func = self.resources[uri]["function"] content = await resource_func() return { "jsonrpc": "2.0", "id": request_id, "result": { "contents": [ { "uri": uri, "mimeType": "text/plain", "text": str(content) } ] } } except Exception as e: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32603, "message": f"Resource read error: {str(e)}" } } # Create server instance server = MCPServer("hf-mcp-server", "1.0.0") # Sample Tools @server.tool( name="echo", description="Echo back the input text", parameters={ "text": { "type": "string", "description": "Text to echo back" } } ) async def echo_tool(text: str) -> str: """Simple echo tool""" return f"Echo: {text}" @server.tool( name="current_time", description="Get the current timestamp", parameters={} ) async def current_time_tool() -> str: """Get current time""" return f"Current time: {datetime.now().isoformat()}" @server.tool( name="calculate", description="Perform basic mathematical calculations", parameters={ "expression": { "type": "string", "description": "Mathematical expression to evaluate (e.g., '2 + 2')" } } ) async def calculate_tool(expression: str) -> str: """Simple calculator tool""" try: # Basic safety check - only allow certain characters allowed_chars = set("0123456789+-*/()%. ") if not all(c in allowed_chars for c in expression): return "Error: Invalid characters in expression" result = eval(expression) return f"Result: {expression} = {result}" except Exception as e: return f"Error: {str(e)}" @server.tool( name="word_count", description="Count words in the given text", parameters={ "text": { "type": "string", "description": "Text to count words in" } } ) async def word_count_tool(text: str) -> str: """Count words in text""" words = text.split() chars = len(text) chars_no_spaces = len(text.replace(" ", "")) return f"Text analysis:\n- Words: {len(words)}\n- Characters: {chars}\n- Characters (no spaces): {chars_no_spaces}" @server.tool( name="reverse_text", description="Reverse the given text", parameters={ "text": { "type": "string", "description": "Text to reverse" } } ) async def reverse_text_tool(text: str) -> str: """Reverse text""" return f"Reversed: {text[::-1]}" # Sample Resources @server.resource( uri="server://info", name="Server Information", description="Information about this MCP server" ) async def server_info_resource() -> str: """Server information resource""" return f""" MCP Server Information: - Name: {server.name} - Version: {server.version} - Tools available: {len(server.tools)} - Resources available: {len(server.resources)} - Started at: {datetime.now().isoformat()} - Environment: Hugging Face Spaces """ @server.resource( uri="server://capabilities", name="Server Capabilities", description="List of available tools and their descriptions" ) async def capabilities_resource() -> str: """Server capabilities resource""" capabilities = "Available Tools:\n\n" for name, tool_info in server.tools.items(): capabilities += f"- {name}: {tool_info['description']}\n" capabilities += "\nAvailable Resources:\n\n" for uri, resource_info in server.resources.items(): capabilities += f"- {uri}: {resource_info['description']}\n" return capabilities # Main execution function for stdio mode async def main(): """Main function for stdio-based MCP server""" server.logger.info("Starting MCP server in stdio mode") try: while True: # Read from stdin line = await asyncio.get_event_loop().run_in_executor( None, sys.stdin.readline ) if not line: break try: request = json.loads(line.strip()) response = await server.handle_request(request) print(json.dumps(response)) sys.stdout.flush() except json.JSONDecodeError: error_response = { "jsonrpc": "2.0", "id": None, "error": { "code": -32700, "message": "Parse error" } } print(json.dumps(error_response)) sys.stdout.flush() except KeyboardInterrupt: server.logger.info("Server shutting down") except Exception as e: server.logger.error(f"Server error: {e}") if __name__ == "__main__": asyncio.run(main())