Spaces:
Sleeping
Sleeping
from mcp.server.fastmcp import FastMCP | |
from typing import Dict, Any, Optional | |
import json | |
from datetime import datetime | |
# Initialize FastMCP server | |
mcp = FastMCP( | |
"RequestProcessor", # Name of the MCP server | |
instructions="You are a request processing assistant that can handle various types of requests and perform data processing tasks.", | |
host="0.0.0.0", | |
port=8001, | |
) | |
async def process_request(message: str, data: Optional[Dict[str, Any]] = None) -> str: | |
""" | |
Process various types of requests and perform data processing. | |
Args: | |
message (str): The main message or request description | |
data (Dict[str, Any], optional): Additional data to process | |
Returns: | |
str: Processing result and response | |
""" | |
try: | |
if data is None: | |
data = {} | |
# Process the request based on message content | |
processed_data = { | |
"original_message": message, | |
"processed_at": datetime.now().isoformat(), | |
"service_version": "1.0.0", | |
"data_size": len(str(data)), | |
"processing_status": "success" | |
} | |
# Add any additional processing logic here | |
if "analyze" in message.lower(): | |
processed_data["analysis_type"] = "text_analysis" | |
processed_data["word_count"] = len(message.split()) | |
if "validate" in message.lower(): | |
processed_data["validation_type"] = "data_validation" | |
processed_data["is_valid"] = True | |
return json.dumps(processed_data, indent=2, ensure_ascii=False) | |
except Exception as e: | |
return f"Error processing request: {str(e)}" | |
async def get_service_info() -> str: | |
""" | |
Get information about the request processing service. | |
Returns: | |
str: Service information | |
""" | |
info = { | |
"service_name": "RequestProcessor", | |
"version": "1.0.0", | |
"status": "running", | |
"capabilities": [ | |
"request_processing", | |
"data_analysis", | |
"text_validation", | |
"general_data_handling" | |
], | |
"description": "A versatile request processing service that can handle various types of data and requests" | |
} | |
return json.dumps(info, indent=2, ensure_ascii=False) | |
async def validate_data(data: Dict[str, Any]) -> str: | |
""" | |
Validate the structure and content of provided data. | |
Args: | |
data (Dict[str, Any]): Data to validate | |
Returns: | |
str: Validation results | |
""" | |
try: | |
validation_result = { | |
"is_valid": True, | |
"validation_timestamp": datetime.now().isoformat(), | |
"data_type": type(data).__name__, | |
"data_keys": list(data.keys()) if isinstance(data, dict) else [], | |
"data_size": len(str(data)), | |
"issues": [] | |
} | |
# Basic validation logic | |
if not data: | |
validation_result["is_valid"] = False | |
validation_result["issues"].append("Data is empty") | |
if isinstance(data, dict): | |
for key, value in data.items(): | |
if value is None: | |
validation_result["issues"].append(f"Key '{key}' has null value") | |
if validation_result["issues"]: | |
validation_result["is_valid"] = False | |
return json.dumps(validation_result, indent=2, ensure_ascii=False) | |
except Exception as e: | |
return f"Error during validation: {str(e)}" | |
if __name__ == "__main__": | |
# Start the MCP server with stdio transport | |
mcp.run(transport="stdio") |