Spaces:
Running
Running
| """ | |
| Chat handling logic for Universal MCP Client - Fixed Version with File Upload Support | |
| """ | |
| import re | |
| import logging | |
| import traceback | |
| from datetime import datetime | |
| from typing import Dict, Any, List, Tuple, Optional | |
| import gradio as gr | |
| from gradio import ChatMessage | |
| from gradio_client import Client | |
| import time | |
| import json | |
| import httpx | |
| from config import AppConfig | |
| from mcp_client import UniversalMCPClient | |
| logger = logging.getLogger(__name__) | |
| class ChatHandler: | |
| """Handles chat interactions with HF Inference Providers and MCP servers using ChatMessage dataclass""" | |
| def __init__(self, mcp_client: UniversalMCPClient): | |
| self.mcp_client = mcp_client | |
| # Initialize the file uploader client for converting local files to public URLs | |
| try: | |
| self.uploader_client = Client("abidlabs/file-uploader") | |
| logger.info("β File uploader client initialized") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize file uploader: {e}") | |
| self.uploader_client = None | |
| def _upload_file_to_gradio_server(self, file_path: str) -> str: | |
| """Upload a file to the Gradio server and get a public URL""" | |
| if not self.uploader_client: | |
| logger.error("File uploader client not initialized") | |
| return file_path | |
| try: | |
| # Open file in binary mode as your peer discovered | |
| with open(file_path, "rb") as f_: | |
| files = [("files", (file_path.split("/")[-1], f_))] | |
| r = httpx.post( | |
| self.uploader_client.upload_url, | |
| files=files, | |
| ) | |
| r.raise_for_status() | |
| result = r.json() | |
| uploaded_path = result[0] | |
| # Construct the full public URL | |
| public_url = f"{self.uploader_client.src}/gradio_api/file={uploaded_path}" | |
| logger.info(f"β Uploaded {file_path} -> {public_url}") | |
| return public_url | |
| except Exception as e: | |
| logger.error(f"Failed to upload file {file_path}: {e}") | |
| return file_path # Return original path as fallback | |
| def process_multimodal_message(self, message: Dict[str, Any], history: List) -> Tuple[List[ChatMessage], Dict[str, Any]]: | |
| """Enhanced MCP chat function with multimodal input support and ChatMessage formatting""" | |
| if not self.mcp_client.hf_client: | |
| error_msg = "β HuggingFace token not configured. Please set HF_TOKEN environment variable or login." | |
| history.append(ChatMessage(role="user", content=error_msg)) | |
| history.append(ChatMessage(role="assistant", content=error_msg)) | |
| return history, gr.MultimodalTextbox(value=None, interactive=False) | |
| if not self.mcp_client.current_provider or not self.mcp_client.current_model: | |
| error_msg = "β Please select an inference provider and model first." | |
| history.append(ChatMessage(role="user", content=error_msg)) | |
| history.append(ChatMessage(role="assistant", content=error_msg)) | |
| return history, gr.MultimodalTextbox(value=None, interactive=False) | |
| # Initialize variables for error handling | |
| user_text = "" | |
| user_files = [] | |
| uploaded_file_urls = [] # Store uploaded file URLs | |
| self.file_url_mapping = {} # Add this: Map local paths to uploaded URLs | |
| try: | |
| # Handle multimodal input - message is a dict with 'text' and 'files' | |
| user_text = message.get("text", "") if message else "" | |
| user_files = message.get("files", []) if message else [] | |
| # Handle case where message might be a string (backward compatibility) | |
| if isinstance(message, str): | |
| user_text = message | |
| user_files = [] | |
| logger.info(f"π¬ Processing multimodal message:") | |
| logger.info(f" π Text: {user_text}") | |
| logger.info(f" π Files: {len(user_files)} files uploaded") | |
| logger.info(f" π History type: {type(history)}, length: {len(history)}") | |
| # Convert history to ChatMessage objects if needed | |
| converted_history = [] | |
| for i, msg in enumerate(history): | |
| try: | |
| if isinstance(msg, dict): | |
| # Convert dict to ChatMessage for internal processing | |
| logger.info(f" π Converting dict message {i}: {msg.get('role', 'unknown')}") | |
| converted_history.append(ChatMessage( | |
| role=msg.get('role', 'assistant'), | |
| content=msg.get('content', ''), | |
| metadata=msg.get('metadata', None) | |
| )) | |
| else: | |
| # Already a ChatMessage | |
| logger.info(f" β ChatMessage {i}: {getattr(msg, 'role', 'unknown')}") | |
| converted_history.append(msg) | |
| except Exception as conv_error: | |
| logger.error(f"Error converting message {i}: {conv_error}") | |
| logger.error(f"Message content: {msg}") | |
| # Skip problematic messages | |
| continue | |
| history = converted_history | |
| # Upload files and get public URLs | |
| for file_path in user_files: | |
| logger.info(f" π Local File: {file_path}") | |
| try: | |
| # Upload file to get public URL | |
| uploaded_url = self._upload_file_to_gradio_server(file_path) | |
| # Store the mapping | |
| self.file_url_mapping[file_path] = uploaded_url | |
| logger.info(f" β Uploaded File URL: {uploaded_url}") | |
| # Add to history with public URL | |
| history.append(ChatMessage(role="user", content={"path": uploaded_url})) | |
| except Exception as upload_error: | |
| logger.error(f"Failed to upload file {file_path}: {upload_error}") | |
| # Fallback to local path with warning | |
| history.append(ChatMessage(role="user", content={"path": file_path})) | |
| logger.warning(f"β οΈ Using local path for {file_path} - MCP servers may not be able to access it") | |
| # Add text message if provided | |
| if user_text and user_text.strip(): | |
| history.append(ChatMessage(role="user", content=user_text)) | |
| # If no text and no files, return early | |
| if not user_text.strip() and not user_files: | |
| return history, gr.MultimodalTextbox(value=None, interactive=False) | |
| # Create messages for HF Inference API | |
| messages = self._prepare_hf_messages(history, uploaded_file_urls) | |
| # Process the chat and get structured responses | |
| response_messages = self._call_hf_api(messages, uploaded_file_urls) | |
| # Add all response messages to history | |
| history.extend(response_messages) | |
| return history, gr.MultimodalTextbox(value=None, interactive=False) | |
| except Exception as e: | |
| error_msg = f"β Error: {str(e)}" | |
| logger.error(f"Chat error: {e}") | |
| logger.error(traceback.format_exc()) | |
| # Add user input to history if it exists | |
| if user_text and user_text.strip(): | |
| history.append(ChatMessage(role="user", content=user_text)) | |
| if user_files: | |
| for file_path in user_files: | |
| history.append(ChatMessage(role="user", content={"path": file_path})) | |
| history.append(ChatMessage(role="assistant", content=error_msg)) | |
| return history, gr.MultimodalTextbox(value=None, interactive=False) | |
| def _prepare_hf_messages(self, history: List, uploaded_file_urls: List[str] = None) -> List[Dict[str, Any]]: | |
| """Convert history (ChatMessage or dict) to HuggingFace Inference API format""" | |
| messages = [] | |
| # Get optimal context settings for current model/provider | |
| if self.mcp_client.current_model and self.mcp_client.current_provider: | |
| context_settings = AppConfig.get_optimal_context_settings( | |
| self.mcp_client.current_model, | |
| self.mcp_client.current_provider, | |
| len(self.mcp_client.get_enabled_servers()) | |
| ) | |
| max_history = context_settings['recommended_history_limit'] | |
| else: | |
| max_history = 20 # Fallback | |
| # Convert history to HF API format (text only for context) | |
| recent_history = history[-max_history:] if len(history) > max_history else history | |
| for msg in recent_history: | |
| # Handle both ChatMessage objects and dictionary format for backward compatibility | |
| if hasattr(msg, 'role'): # ChatMessage object | |
| role = msg.role | |
| content = msg.content | |
| elif isinstance(msg, dict) and 'role' in msg: # Dictionary format | |
| role = msg.get('role') | |
| content = msg.get('content') | |
| else: | |
| continue # Skip invalid messages | |
| if role in ["user", "assistant"]: | |
| # Convert any non-string content to string description for context | |
| if isinstance(content, dict): | |
| if "path" in content: | |
| file_path = content.get('path', 'unknown') | |
| # Check if it's a public URL or local path | |
| if file_path.startswith('http'): | |
| # It's already a public URL | |
| if AppConfig.is_image_file(file_path): | |
| content = f"[User uploaded an image: {file_path}]" | |
| elif AppConfig.is_audio_file(file_path): | |
| content = f"[User uploaded an audio file: {file_path}]" | |
| elif AppConfig.is_video_file(file_path): | |
| content = f"[User uploaded a video file: {file_path}]" | |
| else: | |
| content = f"[User uploaded a file: {file_path}]" | |
| else: | |
| # Local path - mention it's not accessible to remote servers | |
| content = f"[User uploaded a file (local path, not accessible to remote servers): {file_path}]" | |
| else: | |
| content = f"[Object: {str(content)[:50]}...]" | |
| elif isinstance(content, (list, tuple)): | |
| content = f"[List: {str(content)[:50]}...]" | |
| elif content is None: | |
| content = "[Empty]" | |
| else: | |
| content = str(content) | |
| messages.append({ | |
| "role": role, | |
| "content": content | |
| }) | |
| return messages | |
| def _call_hf_api(self, messages: List[Dict[str, Any]], uploaded_file_urls: List[str] = None) -> List[ChatMessage]: | |
| """Call HuggingFace Inference API and return structured ChatMessage responses""" | |
| # Check if we have enabled MCP servers to use | |
| enabled_servers = self.mcp_client.get_enabled_servers() | |
| if not enabled_servers: | |
| return self._call_hf_without_mcp(messages) | |
| else: | |
| return self._call_hf_with_mcp(messages, uploaded_file_urls) | |
| def _call_hf_without_mcp(self, messages: List[Dict[str, Any]]) -> List[ChatMessage]: | |
| """Call HF Inference API without MCP servers""" | |
| logger.info("π¬ No MCP servers available, using regular HF Inference chat") | |
| system_prompt = self._get_native_system_prompt() | |
| # Add system prompt to messages | |
| if messages and messages[0].get("role") == "system": | |
| messages[0]["content"] = system_prompt + "\n\n" + messages[0]["content"] | |
| else: | |
| messages.insert(0, {"role": "system", "content": system_prompt}) | |
| # Get optimal token settings | |
| if self.mcp_client.current_model and self.mcp_client.current_provider: | |
| context_settings = AppConfig.get_optimal_context_settings( | |
| self.mcp_client.current_model, | |
| self.mcp_client.current_provider, | |
| 0 # No MCP servers | |
| ) | |
| max_tokens = context_settings['max_response_tokens'] | |
| else: | |
| max_tokens = 8192 | |
| # Use HF Inference API | |
| try: | |
| response = self.mcp_client.generate_chat_completion(messages, **{"max_tokens": max_tokens}) | |
| response_text = response.choices[0].message.content | |
| if not response_text: | |
| response_text = "I understand your request and I'm here to help." | |
| return [ChatMessage(role="assistant", content=response_text)] | |
| except Exception as e: | |
| logger.error(f"HF Inference API call failed: {e}") | |
| return [ChatMessage(role="assistant", content=f"β API call failed: {str(e)}")] | |
| def _call_hf_with_mcp(self, messages: List[Dict[str, Any]], uploaded_file_urls: List[str] = None) -> List[ChatMessage]: | |
| """Call HF Inference API with MCP servers and return structured responses""" | |
| # Enhanced system prompt with multimodal and MCP instructions | |
| system_prompt = self._get_mcp_system_prompt(uploaded_file_urls) | |
| # Add system prompt to messages | |
| if messages and messages[0].get("role") == "system": | |
| messages[0]["content"] = system_prompt + "\n\n" + messages[0]["content"] | |
| else: | |
| messages.insert(0, {"role": "system", "content": system_prompt}) | |
| # Get optimal token settings | |
| enabled_servers = self.mcp_client.get_enabled_servers() | |
| if self.mcp_client.current_model and self.mcp_client.current_provider: | |
| context_settings = AppConfig.get_optimal_context_settings( | |
| self.mcp_client.current_model, | |
| self.mcp_client.current_provider, | |
| len(enabled_servers) | |
| ) | |
| max_tokens = context_settings['max_response_tokens'] | |
| else: | |
| max_tokens = 8192 | |
| # Debug logging | |
| logger.info(f"π€ Sending {len(messages)} messages to HF Inference API") | |
| logger.info(f"π§ Using {len(self.mcp_client.servers)} MCP servers") | |
| logger.info(f"π€ Model: {self.mcp_client.current_model} via {self.mcp_client.current_provider}") | |
| logger.info(f"π Max tokens: {max_tokens}") | |
| start_time = time.time() | |
| try: | |
| # Pass file mapping to MCP client | |
| if hasattr(self, 'file_url_mapping'): | |
| self.mcp_client.chat_handler_file_mapping = self.file_url_mapping | |
| # Call HF Inference with MCP tool support - using optimal max_tokens | |
| response = self.mcp_client.generate_chat_completion_with_mcp_tools(messages, **{"max_tokens": max_tokens}) | |
| return self._process_hf_response(response, start_time) | |
| except Exception as e: | |
| logger.error(f"HF Inference API call with MCP failed: {e}") | |
| return [ChatMessage(role="assistant", content=f"β API call failed: {str(e)}")] | |
| def _process_hf_response(self, response, start_time: float) -> List[ChatMessage]: | |
| """Process HF Inference response with simplified media handling and nested errors""" | |
| chat_messages = [] | |
| try: | |
| response_text = response.choices[0].message.content | |
| if not response_text: | |
| response_text = "I understand your request and I'm here to help." | |
| # Check if this response includes tool execution info | |
| if hasattr(response, '_tool_execution'): | |
| tool_info = response._tool_execution | |
| logger.info(f"π§ Processing response with tool execution: {tool_info}") | |
| duration = round(time.time() - start_time, 2) | |
| tool_id = f"tool_{tool_info['tool']}_{int(time.time())}" | |
| if tool_info['success']: | |
| tool_result = str(tool_info['result']) | |
| # Extract media URL if present | |
| media_url = self._extract_media_url(tool_result, tool_info.get('server', '')) | |
| # Create tool usage metadata message | |
| chat_messages.append(ChatMessage( | |
| role="assistant", | |
| content="", | |
| metadata={ | |
| "title": f"π§ Used {tool_info['tool']}", | |
| "status": "done", | |
| "duration": duration, | |
| "id": tool_id | |
| } | |
| )) | |
| # Add nested success message with the raw result | |
| if media_url: | |
| result_preview = f"β Successfully generated media\nURL: {media_url[:100]}..." | |
| else: | |
| result_preview = f"β Tool executed successfully\nResult: {tool_result[:200]}..." | |
| chat_messages.append(ChatMessage( | |
| role="assistant", | |
| content=result_preview, | |
| metadata={ | |
| "title": "π Server Response", | |
| "parent_id": tool_id, | |
| "status": "done" | |
| } | |
| )) | |
| # Add LLM's descriptive text if present (before media) | |
| if response_text and not response_text.startswith('{"use_tool"'): | |
| # Clean the response text by removing URLs and tool JSON | |
| clean_response = response_text | |
| if media_url and media_url in clean_response: | |
| clean_response = clean_response.replace(media_url, "").strip() | |
| # Remove any remaining JSON tool call patterns | |
| clean_response = re.sub(r'\{"use_tool"[^}]+\}', '', clean_response).strip() | |
| # Remove all markdown link/image syntax completely | |
| clean_response = re.sub(r'!\[([^\]]*)\]\([^)]*\)', '', clean_response) # Remove image markdown | |
| clean_response = re.sub(r'\[([^\]]*)\]\([^)]*\)', '', clean_response) # Remove link markdown | |
| clean_response = re.sub(r'!\[([^\]]*)\]', '', clean_response) # Remove broken image refs | |
| clean_response = re.sub(r'\[([^\]]*)\]', '', clean_response) # Remove broken link refs | |
| clean_response = re.sub(r'\(\s*\)', '', clean_response) # Remove empty parentheses | |
| clean_response = clean_response.strip() # Final strip | |
| # Only add if there's meaningful text left after cleaning | |
| if clean_response and len(clean_response) > 10: | |
| chat_messages.append(ChatMessage( | |
| role="assistant", | |
| content=clean_response | |
| )) | |
| # Handle media content if present | |
| if media_url: | |
| # Add media as a separate message - Gradio will auto-detect type | |
| chat_messages.append(ChatMessage( | |
| role="assistant", | |
| content={"path": media_url} | |
| )) | |
| else: | |
| # No media URL found, check if we need to show non-media result | |
| if not response_text or response_text.startswith('{"use_tool"'): | |
| # Only show result if there wasn't descriptive text from LLM | |
| if len(tool_result) > 500: | |
| result_preview = f"Operation completed successfully. Result preview: {tool_result[:500]}..." | |
| else: | |
| result_preview = f"Operation completed successfully. Result: {tool_result}" | |
| chat_messages.append(ChatMessage( | |
| role="assistant", | |
| content=result_preview | |
| )) | |
| else: | |
| # Tool execution failed | |
| error_details = tool_info['result'] | |
| # Create main tool message with error status | |
| chat_messages.append(ChatMessage( | |
| role="assistant", | |
| content="", | |
| metadata={ | |
| "title": f"β Used {tool_info['tool']}", | |
| "status": "error", | |
| "duration": duration, | |
| "id": tool_id | |
| } | |
| )) | |
| # Add nested error response from server | |
| chat_messages.append(ChatMessage( | |
| role="assistant", | |
| content=f"β Tool execution failed\n```\n{error_details}\n```", | |
| metadata={ | |
| "title": "π Server Response", | |
| "parent_id": tool_id, | |
| "status": "error" | |
| } | |
| )) | |
| # Add suggestions as another nested message | |
| chat_messages.append(ChatMessage( | |
| role="assistant", | |
| content="**Suggestions:**\nβ’ Try modifying your request slightly\nβ’ Wait a moment and try again\nβ’ Use a different MCP server if available", | |
| metadata={ | |
| "title": "π‘ Possible Solutions", | |
| "parent_id": tool_id, | |
| "status": "info" | |
| } | |
| )) | |
| else: | |
| # No tool usage, just return the response | |
| chat_messages.append(ChatMessage( | |
| role="assistant", | |
| content=response_text | |
| )) | |
| except Exception as e: | |
| logger.error(f"Error processing HF response: {e}") | |
| logger.error(traceback.format_exc()) | |
| chat_messages.append(ChatMessage( | |
| role="assistant", | |
| content="I understand your request and I'm here to help." | |
| )) | |
| return chat_messages | |
| def _extract_media_url(self, result_text: str, server_name: str) -> Optional[str]: | |
| """Extract media URL from MCP response with improved pattern matching""" | |
| if not isinstance(result_text, str): | |
| return None | |
| logger.info(f"π Extracting media from result: {result_text[:500]}...") | |
| # Try JSON parsing first | |
| try: | |
| if result_text.strip().startswith('[') or result_text.strip().startswith('{'): | |
| data = json.loads(result_text.strip()) | |
| # Handle array format | |
| if isinstance(data, list) and len(data) > 0: | |
| item = data[0] | |
| if isinstance(item, dict): | |
| # Check for nested media structure | |
| for media_type in ['audio', 'video', 'image']: | |
| if media_type in item and isinstance(item[media_type], dict): | |
| if 'url' in item[media_type]: | |
| url = item[media_type]['url'].strip('\'"') | |
| logger.info(f"π― Found {media_type} URL in JSON: {url}") | |
| return url | |
| # Check for direct URL | |
| if 'url' in item: | |
| url = item['url'].strip('\'"') | |
| logger.info(f"π― Found direct URL in JSON: {url}") | |
| return url | |
| # Handle object format | |
| elif isinstance(data, dict): | |
| # Check for nested media structure | |
| for media_type in ['audio', 'video', 'image']: | |
| if media_type in data and isinstance(data[media_type], dict): | |
| if 'url' in data[media_type]: | |
| url = data[media_type]['url'].strip('\'"') | |
| logger.info(f"π― Found {media_type} URL in JSON: {url}") | |
| return url | |
| # Check for direct URL | |
| if 'url' in data: | |
| url = data['url'].strip('\'"') | |
| logger.info(f"π― Found direct URL in JSON: {url}") | |
| return url | |
| except json.JSONDecodeError: | |
| pass | |
| # Check for Gradio file URLs (common pattern) | |
| gradio_patterns = [ | |
| r'https://[^/]+\.hf\.space/gradio_api/file=/[^/]+/[^/]+/[^\s"\'<>,]+', | |
| r'https://[^/]+\.hf\.space/file=[^\s"\'<>,]+', | |
| r'/gradio_api/file=/[^\s"\'<>,]+' | |
| ] | |
| for pattern in gradio_patterns: | |
| match = re.search(pattern, result_text) | |
| if match: | |
| url = match.group(0).rstrip('\'",:;') | |
| logger.info(f"π― Found Gradio file URL: {url}") | |
| return url | |
| # Check for any HTTP URLs with media extensions | |
| url_pattern = r'https?://[^\s"\'<>]+\.(?:mp3|wav|ogg|m4a|flac|aac|opus|wma|mp4|webm|avi|mov|mkv|m4v|wmv|png|jpg|jpeg|gif|webp|bmp|svg)' | |
| match = re.search(url_pattern, result_text, re.IGNORECASE) | |
| if match: | |
| url = match.group(0) | |
| logger.info(f"π― Found media URL by extension: {url}") | |
| return url | |
| # Check for data URLs | |
| if result_text.startswith('data:'): | |
| logger.info("π― Found data URL") | |
| return result_text | |
| logger.info("β No media URL found in result") | |
| return None | |
| def _get_native_system_prompt(self) -> str: | |
| """Get system prompt for HF Inference without MCP servers""" | |
| model_info = AppConfig.AVAILABLE_MODELS.get(self.mcp_client.current_model, {}) | |
| context_length = model_info.get("context_length", 128000) | |
| return f"""You are an AI assistant powered by {self.mcp_client.current_model} via {self.mcp_client.current_provider}. You have native capabilities for: | |
| - **Text Processing**: You can analyze, summarize, translate, and process text directly | |
| - **General Knowledge**: You can answer questions, explain concepts, and have conversations | |
| - **Code Analysis**: You can read, analyze, and explain code | |
| - **Reasoning**: You can perform step-by-step reasoning and problem-solving | |
| - **Context Window**: You have access to {context_length:,} tokens of context | |
| Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
| Please provide helpful, accurate, and engaging responses to user queries.""" | |
| def _get_mcp_system_prompt(self, uploaded_file_urls: List[str] = None) -> str: | |
| """Get enhanced system prompt for HF Inference with MCP servers""" | |
| model_info = AppConfig.AVAILABLE_MODELS.get(self.mcp_client.current_model, {}) | |
| context_length = model_info.get("context_length", 128000) | |
| uploaded_files_context = "" | |
| if uploaded_file_urls: | |
| uploaded_files_context = f"\n\nFILES UPLOADED BY USER (Public URLs accessible to MCP servers):\n" | |
| for i, file_url in enumerate(uploaded_file_urls, 1): | |
| file_name = file_url.split('/')[-1] if '/' in file_url else file_url | |
| if AppConfig.is_image_file(file_url): | |
| file_type = "Image" | |
| elif AppConfig.is_audio_file(file_url): | |
| file_type = "Audio" | |
| elif AppConfig.is_video_file(file_url): | |
| file_type = "Video" | |
| else: | |
| file_type = "File" | |
| uploaded_files_context += f"{i}. {file_type}: {file_name}\n URL: {file_url}\n" | |
| # Get available tools with correct names from enabled servers only | |
| enabled_servers = self.mcp_client.get_enabled_servers() | |
| tools_info = [] | |
| for server_name, config in enabled_servers.items(): | |
| tools_info.append(f"- **{server_name}**: {config.description}") | |
| return f"""You are an AI assistant powered by {self.mcp_client.current_model} via {self.mcp_client.current_provider}, with access to various MCP tools. | |
| YOUR NATIVE CAPABILITIES: | |
| - **Text Processing**: You can analyze, summarize, translate, and process text directly | |
| - **General Knowledge**: You can answer questions, explain concepts, and have conversations | |
| - **Code Analysis**: You can read, analyze, and explain code | |
| - **Reasoning**: You can perform step-by-step reasoning and problem-solving | |
| - **Context Window**: You have access to {context_length:,} tokens of context | |
| AVAILABLE MCP TOOLS: | |
| You have access to the following MCP servers: | |
| {chr(10).join(tools_info)} | |
| WHEN TO USE MCP TOOLS: | |
| - **Image Generation**: Creating new images from text prompts | |
| - **Image Editing**: Modifying, enhancing, or transforming existing images | |
| - **Audio Processing**: Transcribing audio, generating speech, audio enhancement | |
| - **Video Processing**: Creating or editing videos | |
| - **Text to Speech**: Converting text to audio | |
| - **Specialized Analysis**: Tasks requiring specific models or APIs | |
| TOOL USAGE FORMAT: | |
| When you need to use an MCP tool, respond with JSON in this exact format: | |
| {{"use_tool": true, "server": "exact_server_name", "tool": "exact_tool_name", "arguments": {{"param": "value"}}}} | |
| IMPORTANT: Always describe what you're going to do BEFORE the JSON tool call. For example: | |
| "I'll generate speech for your text using the TTS tool." | |
| {{"use_tool": true, "server": "text to speech", "tool": "Kokoro_TTS_mcp_test_generate_first", "arguments": {{"text": "hello"}}}} | |
| IMPORTANT TOOL NAME MAPPING: | |
| - For TTS server: use tool name "Kokoro_TTS_mcp_test_generate_first" | |
| - For image generation: use tool name "dalle_3_xl_lora_v2_generate" | |
| - For video generation: use tool name "ysharma_ltx_video_distilledtext_to_video" | |
| - For letter counting: use tool name "gradio_app_dummy1_letter_counter" | |
| EXACT SERVER NAMES TO USE: | |
| {', '.join([f'"{name}"' for name in enabled_servers.keys()])} | |
| FILE HANDLING FOR MCP TOOLS: | |
| When using MCP tools with uploaded files, always use the public URLs provided above. | |
| These URLs are accessible to remote MCP servers. | |
| {uploaded_files_context} | |
| MEDIA HANDLING: | |
| When tool results contain media URLs (images, audio, videos), the system will automatically embed them as playable media. | |
| IMPORTANT NOTES: | |
| - Always use the EXACT server names and tool names as specified above | |
| - Use proper JSON format for tool calls | |
| - Include all required parameters in arguments | |
| - For file inputs to MCP tools, use the public URLs provided, not local paths | |
| - ALWAYS provide a descriptive message before the JSON tool call | |
| - After tool execution, you can provide additional context or ask if the user needs anything else | |
| Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
| Current model: {self.mcp_client.current_model} via {self.mcp_client.current_provider}""" | |