|
""" |
|
Chat handling logic for Universal MCP Client - Fixed Version with File Upload Support |
|
""" |
|
import re |
|
import logging |
|
import traceback |
|
from datetime import datetime |
|
from typing import Dict, Any, List, Tuple, Optional |
|
import gradio as gr |
|
from gradio import ChatMessage |
|
from gradio_client import Client |
|
import time |
|
import json |
|
import httpx |
|
|
|
from config import AppConfig |
|
from mcp_client import UniversalMCPClient |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class ChatHandler: |
|
"""Handles chat interactions with HF Inference Providers and MCP servers using ChatMessage dataclass""" |
|
|
|
def __init__(self, mcp_client: UniversalMCPClient): |
|
self.mcp_client = mcp_client |
|
|
|
try: |
|
self.uploader_client = Client("abidlabs/file-uploader") |
|
logger.info("β
File uploader client initialized") |
|
except Exception as e: |
|
logger.error(f"Failed to initialize file uploader: {e}") |
|
self.uploader_client = None |
|
|
|
def _upload_file_to_gradio_server(self, file_path: str) -> str: |
|
"""Upload a file to the Gradio server and get a public URL""" |
|
if not self.uploader_client: |
|
logger.error("File uploader client not initialized") |
|
return file_path |
|
|
|
try: |
|
|
|
with open(file_path, "rb") as f_: |
|
files = [("files", (file_path.split("/")[-1], f_))] |
|
r = httpx.post( |
|
self.uploader_client.upload_url, |
|
files=files, |
|
) |
|
r.raise_for_status() |
|
result = r.json() |
|
uploaded_path = result[0] |
|
|
|
public_url = f"{self.uploader_client.src}/gradio_api/file={uploaded_path}" |
|
logger.info(f"β
Uploaded {file_path} -> {public_url}") |
|
return public_url |
|
except Exception as e: |
|
logger.error(f"Failed to upload file {file_path}: {e}") |
|
return file_path |
|
|
|
def process_multimodal_message(self, message: Dict[str, Any], history: List) -> Tuple[List[ChatMessage], Dict[str, Any]]: |
|
"""Enhanced MCP chat function with multimodal input support and ChatMessage formatting""" |
|
|
|
if not self.mcp_client.hf_client: |
|
error_msg = "β HuggingFace token not configured. Please set HF_TOKEN environment variable or login." |
|
history.append(ChatMessage(role="user", content=error_msg)) |
|
history.append(ChatMessage(role="assistant", content=error_msg)) |
|
return history, gr.MultimodalTextbox(value=None, interactive=False) |
|
|
|
if not self.mcp_client.current_provider or not self.mcp_client.current_model: |
|
error_msg = "β Please select an inference provider and model first." |
|
history.append(ChatMessage(role="user", content=error_msg)) |
|
history.append(ChatMessage(role="assistant", content=error_msg)) |
|
return history, gr.MultimodalTextbox(value=None, interactive=False) |
|
|
|
|
|
user_text = "" |
|
user_files = [] |
|
uploaded_file_urls = [] |
|
self.file_url_mapping = {} |
|
|
|
try: |
|
|
|
user_text = message.get("text", "") if message else "" |
|
user_files = message.get("files", []) if message else [] |
|
|
|
|
|
if isinstance(message, str): |
|
user_text = message |
|
user_files = [] |
|
|
|
logger.info(f"π¬ Processing multimodal message:") |
|
logger.info(f" π Text: {user_text}") |
|
logger.info(f" π Files: {len(user_files)} files uploaded") |
|
logger.info(f" π History type: {type(history)}, length: {len(history)}") |
|
|
|
|
|
converted_history = [] |
|
for i, msg in enumerate(history): |
|
try: |
|
if isinstance(msg, dict): |
|
|
|
logger.info(f" π Converting dict message {i}: {msg.get('role', 'unknown')}") |
|
converted_history.append(ChatMessage( |
|
role=msg.get('role', 'assistant'), |
|
content=msg.get('content', ''), |
|
metadata=msg.get('metadata', None) |
|
)) |
|
else: |
|
|
|
logger.info(f" β
ChatMessage {i}: {getattr(msg, 'role', 'unknown')}") |
|
converted_history.append(msg) |
|
except Exception as conv_error: |
|
logger.error(f"Error converting message {i}: {conv_error}") |
|
logger.error(f"Message content: {msg}") |
|
|
|
continue |
|
|
|
history = converted_history |
|
|
|
|
|
for file_path in user_files: |
|
logger.info(f" π Local File: {file_path}") |
|
try: |
|
|
|
uploaded_url = self._upload_file_to_gradio_server(file_path) |
|
|
|
self.file_url_mapping[file_path] = uploaded_url |
|
logger.info(f" β
Uploaded File URL: {uploaded_url}") |
|
|
|
|
|
history.append(ChatMessage(role="user", content={"path": uploaded_url})) |
|
except Exception as upload_error: |
|
logger.error(f"Failed to upload file {file_path}: {upload_error}") |
|
|
|
history.append(ChatMessage(role="user", content={"path": file_path})) |
|
logger.warning(f"β οΈ Using local path for {file_path} - MCP servers may not be able to access it") |
|
|
|
|
|
if user_text and user_text.strip(): |
|
history.append(ChatMessage(role="user", content=user_text)) |
|
|
|
|
|
if not user_text.strip() and not user_files: |
|
return history, gr.MultimodalTextbox(value=None, interactive=False) |
|
|
|
|
|
messages = self._prepare_hf_messages(history, uploaded_file_urls) |
|
|
|
|
|
response_messages = self._call_hf_api(messages, uploaded_file_urls) |
|
|
|
|
|
history.extend(response_messages) |
|
|
|
return history, gr.MultimodalTextbox(value=None, interactive=False) |
|
|
|
except Exception as e: |
|
error_msg = f"β Error: {str(e)}" |
|
logger.error(f"Chat error: {e}") |
|
logger.error(traceback.format_exc()) |
|
|
|
|
|
if user_text and user_text.strip(): |
|
history.append(ChatMessage(role="user", content=user_text)) |
|
if user_files: |
|
for file_path in user_files: |
|
history.append(ChatMessage(role="user", content={"path": file_path})) |
|
|
|
history.append(ChatMessage(role="assistant", content=error_msg)) |
|
return history, gr.MultimodalTextbox(value=None, interactive=False) |
|
|
|
def _prepare_hf_messages(self, history: List, uploaded_file_urls: List[str] = None) -> List[Dict[str, Any]]: |
|
"""Convert history (ChatMessage or dict) to HuggingFace Inference API format""" |
|
messages = [] |
|
|
|
|
|
if self.mcp_client.current_model and self.mcp_client.current_provider: |
|
context_settings = AppConfig.get_optimal_context_settings( |
|
self.mcp_client.current_model, |
|
self.mcp_client.current_provider, |
|
len(self.mcp_client.get_enabled_servers()) |
|
) |
|
max_history = context_settings['recommended_history_limit'] |
|
else: |
|
max_history = 20 |
|
|
|
|
|
recent_history = history[-max_history:] if len(history) > max_history else history |
|
|
|
for msg in recent_history: |
|
|
|
if hasattr(msg, 'role'): |
|
role = msg.role |
|
content = msg.content |
|
elif isinstance(msg, dict) and 'role' in msg: |
|
role = msg.get('role') |
|
content = msg.get('content') |
|
else: |
|
continue |
|
|
|
if role in ["user", "assistant"]: |
|
|
|
|
|
if isinstance(content, dict): |
|
if "path" in content: |
|
file_path = content.get('path', 'unknown') |
|
|
|
if file_path.startswith('http'): |
|
|
|
if AppConfig.is_image_file(file_path): |
|
content = f"[User uploaded an image: {file_path}]" |
|
elif AppConfig.is_audio_file(file_path): |
|
content = f"[User uploaded an audio file: {file_path}]" |
|
elif AppConfig.is_video_file(file_path): |
|
content = f"[User uploaded a video file: {file_path}]" |
|
else: |
|
content = f"[User uploaded a file: {file_path}]" |
|
else: |
|
|
|
content = f"[User uploaded a file (local path, not accessible to remote servers): {file_path}]" |
|
else: |
|
content = f"[Object: {str(content)[:50]}...]" |
|
elif isinstance(content, (list, tuple)): |
|
content = f"[List: {str(content)[:50]}...]" |
|
elif content is None: |
|
content = "[Empty]" |
|
else: |
|
content = str(content) |
|
|
|
messages.append({ |
|
"role": role, |
|
"content": content |
|
}) |
|
|
|
return messages |
|
|
|
def _call_hf_api(self, messages: List[Dict[str, Any]], uploaded_file_urls: List[str] = None) -> List[ChatMessage]: |
|
"""Call HuggingFace Inference API and return structured ChatMessage responses""" |
|
|
|
|
|
enabled_servers = self.mcp_client.get_enabled_servers() |
|
if not enabled_servers: |
|
return self._call_hf_without_mcp(messages) |
|
else: |
|
return self._call_hf_with_mcp(messages, uploaded_file_urls) |
|
|
|
def _call_hf_without_mcp(self, messages: List[Dict[str, Any]]) -> List[ChatMessage]: |
|
"""Call HF Inference API without MCP servers""" |
|
logger.info("π¬ No MCP servers available, using regular HF Inference chat") |
|
|
|
system_prompt = self._get_native_system_prompt() |
|
|
|
|
|
if messages and messages[0].get("role") == "system": |
|
messages[0]["content"] = system_prompt + "\n\n" + messages[0]["content"] |
|
else: |
|
messages.insert(0, {"role": "system", "content": system_prompt}) |
|
|
|
|
|
if self.mcp_client.current_model and self.mcp_client.current_provider: |
|
context_settings = AppConfig.get_optimal_context_settings( |
|
self.mcp_client.current_model, |
|
self.mcp_client.current_provider, |
|
0 |
|
) |
|
max_tokens = context_settings['max_response_tokens'] |
|
else: |
|
max_tokens = 8192 |
|
|
|
|
|
try: |
|
response = self.mcp_client.generate_chat_completion(messages, **{"max_tokens": max_tokens}) |
|
response_text = response.choices[0].message.content |
|
|
|
if not response_text: |
|
response_text = "I understand your request and I'm here to help." |
|
|
|
return [ChatMessage(role="assistant", content=response_text)] |
|
except Exception as e: |
|
logger.error(f"HF Inference API call failed: {e}") |
|
return [ChatMessage(role="assistant", content=f"β API call failed: {str(e)}")] |
|
|
|
def _call_hf_with_mcp(self, messages: List[Dict[str, Any]], uploaded_file_urls: List[str] = None) -> List[ChatMessage]: |
|
"""Call HF Inference API with MCP servers and return structured responses""" |
|
|
|
|
|
system_prompt = self._get_mcp_system_prompt(uploaded_file_urls) |
|
|
|
|
|
if messages and messages[0].get("role") == "system": |
|
messages[0]["content"] = system_prompt + "\n\n" + messages[0]["content"] |
|
else: |
|
messages.insert(0, {"role": "system", "content": system_prompt}) |
|
|
|
|
|
enabled_servers = self.mcp_client.get_enabled_servers() |
|
if self.mcp_client.current_model and self.mcp_client.current_provider: |
|
context_settings = AppConfig.get_optimal_context_settings( |
|
self.mcp_client.current_model, |
|
self.mcp_client.current_provider, |
|
len(enabled_servers) |
|
) |
|
max_tokens = context_settings['max_response_tokens'] |
|
else: |
|
max_tokens = 8192 |
|
|
|
|
|
logger.info(f"π€ Sending {len(messages)} messages to HF Inference API") |
|
logger.info(f"π§ Using {len(self.mcp_client.servers)} MCP servers") |
|
logger.info(f"π€ Model: {self.mcp_client.current_model} via {self.mcp_client.current_provider}") |
|
logger.info(f"π Max tokens: {max_tokens}") |
|
|
|
start_time = time.time() |
|
|
|
try: |
|
|
|
if hasattr(self, 'file_url_mapping'): |
|
self.mcp_client.chat_handler_file_mapping = self.file_url_mapping |
|
|
|
|
|
response = self.mcp_client.generate_chat_completion_with_mcp_tools(messages, **{"max_tokens": max_tokens}) |
|
|
|
return self._process_hf_response(response, start_time) |
|
except Exception as e: |
|
logger.error(f"HF Inference API call with MCP failed: {e}") |
|
return [ChatMessage(role="assistant", content=f"β API call failed: {str(e)}")] |
|
|
|
def _process_hf_response(self, response, start_time: float) -> List[ChatMessage]: |
|
"""Process HF Inference response with simplified media handling and nested errors""" |
|
chat_messages = [] |
|
|
|
try: |
|
response_text = response.choices[0].message.content |
|
|
|
if not response_text: |
|
response_text = "I understand your request and I'm here to help." |
|
|
|
|
|
if hasattr(response, '_tool_execution'): |
|
tool_info = response._tool_execution |
|
logger.info(f"π§ Processing response with tool execution: {tool_info}") |
|
|
|
duration = round(time.time() - start_time, 2) |
|
tool_id = f"tool_{tool_info['tool']}_{int(time.time())}" |
|
|
|
if tool_info['success']: |
|
tool_result = str(tool_info['result']) |
|
|
|
|
|
media_url = self._extract_media_url(tool_result, tool_info.get('server', '')) |
|
|
|
|
|
chat_messages.append(ChatMessage( |
|
role="assistant", |
|
content="", |
|
metadata={ |
|
"title": f"π§ Used {tool_info['tool']}", |
|
"status": "done", |
|
"duration": duration, |
|
"id": tool_id |
|
} |
|
)) |
|
|
|
|
|
if media_url: |
|
result_preview = f"β
Successfully generated media\nURL: {media_url[:100]}..." |
|
else: |
|
result_preview = f"β
Tool executed successfully\nResult: {tool_result[:200]}..." |
|
|
|
chat_messages.append(ChatMessage( |
|
role="assistant", |
|
content=result_preview, |
|
metadata={ |
|
"title": "π Server Response", |
|
"parent_id": tool_id, |
|
"status": "done" |
|
} |
|
)) |
|
|
|
|
|
if response_text and not response_text.startswith('{"use_tool"'): |
|
|
|
clean_response = response_text |
|
if media_url and media_url in clean_response: |
|
clean_response = clean_response.replace(media_url, "").strip() |
|
|
|
|
|
clean_response = re.sub(r'\{"use_tool"[^}]+\}', '', clean_response).strip() |
|
|
|
|
|
clean_response = re.sub(r'!\[([^\]]*)\]\([^)]*\)', '', clean_response) |
|
clean_response = re.sub(r'\[([^\]]*)\]\([^)]*\)', '', clean_response) |
|
clean_response = re.sub(r'!\[([^\]]*)\]', '', clean_response) |
|
clean_response = re.sub(r'\[([^\]]*)\]', '', clean_response) |
|
clean_response = re.sub(r'\(\s*\)', '', clean_response) |
|
clean_response = clean_response.strip() |
|
|
|
|
|
if clean_response and len(clean_response) > 10: |
|
chat_messages.append(ChatMessage( |
|
role="assistant", |
|
content=clean_response |
|
)) |
|
|
|
if media_url: |
|
|
|
chat_messages.append(ChatMessage( |
|
role="assistant", |
|
content={"path": media_url} |
|
)) |
|
else: |
|
|
|
if not response_text or response_text.startswith('{"use_tool"'): |
|
|
|
if len(tool_result) > 500: |
|
result_preview = f"Operation completed successfully. Result preview: {tool_result[:500]}..." |
|
else: |
|
result_preview = f"Operation completed successfully. Result: {tool_result}" |
|
|
|
chat_messages.append(ChatMessage( |
|
role="assistant", |
|
content=result_preview |
|
)) |
|
|
|
else: |
|
|
|
error_details = tool_info['result'] |
|
|
|
|
|
chat_messages.append(ChatMessage( |
|
role="assistant", |
|
content="", |
|
metadata={ |
|
"title": f"β Used {tool_info['tool']}", |
|
"status": "error", |
|
"duration": duration, |
|
"id": tool_id |
|
} |
|
)) |
|
|
|
|
|
chat_messages.append(ChatMessage( |
|
role="assistant", |
|
content=f"β Tool execution failed\n```\n{error_details}\n```", |
|
metadata={ |
|
"title": "π Server Response", |
|
"parent_id": tool_id, |
|
"status": "error" |
|
} |
|
)) |
|
|
|
|
|
chat_messages.append(ChatMessage( |
|
role="assistant", |
|
content="**Suggestions:**\nβ’ Try modifying your request slightly\nβ’ Wait a moment and try again\nβ’ Use a different MCP server if available", |
|
metadata={ |
|
"title": "π‘ Possible Solutions", |
|
"parent_id": tool_id, |
|
"status": "info" |
|
} |
|
)) |
|
else: |
|
|
|
chat_messages.append(ChatMessage( |
|
role="assistant", |
|
content=response_text |
|
)) |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing HF response: {e}") |
|
logger.error(traceback.format_exc()) |
|
chat_messages.append(ChatMessage( |
|
role="assistant", |
|
content="I understand your request and I'm here to help." |
|
)) |
|
|
|
return chat_messages |
|
|
|
def _extract_media_url(self, result_text: str, server_name: str) -> Optional[str]: |
|
"""Extract media URL from MCP response with improved pattern matching""" |
|
if not isinstance(result_text, str): |
|
return None |
|
|
|
logger.info(f"π Extracting media from result: {result_text[:500]}...") |
|
|
|
|
|
try: |
|
if result_text.strip().startswith('[') or result_text.strip().startswith('{'): |
|
data = json.loads(result_text.strip()) |
|
|
|
|
|
if isinstance(data, list) and len(data) > 0: |
|
item = data[0] |
|
if isinstance(item, dict): |
|
|
|
for media_type in ['audio', 'video', 'image']: |
|
if media_type in item and isinstance(item[media_type], dict): |
|
if 'url' in item[media_type]: |
|
url = item[media_type]['url'].strip('\'"') |
|
logger.info(f"π― Found {media_type} URL in JSON: {url}") |
|
return url |
|
|
|
if 'url' in item: |
|
url = item['url'].strip('\'"') |
|
logger.info(f"π― Found direct URL in JSON: {url}") |
|
return url |
|
|
|
|
|
elif isinstance(data, dict): |
|
|
|
for media_type in ['audio', 'video', 'image']: |
|
if media_type in data and isinstance(data[media_type], dict): |
|
if 'url' in data[media_type]: |
|
url = data[media_type]['url'].strip('\'"') |
|
logger.info(f"π― Found {media_type} URL in JSON: {url}") |
|
return url |
|
|
|
if 'url' in data: |
|
url = data['url'].strip('\'"') |
|
logger.info(f"π― Found direct URL in JSON: {url}") |
|
return url |
|
|
|
except json.JSONDecodeError: |
|
pass |
|
|
|
|
|
gradio_patterns = [ |
|
r'https://[^/]+\.hf\.space/gradio_api/file=/[^/]+/[^/]+/[^\s"\'<>,]+', |
|
r'https://[^/]+\.hf\.space/file=[^\s"\'<>,]+', |
|
r'/gradio_api/file=/[^\s"\'<>,]+' |
|
] |
|
|
|
for pattern in gradio_patterns: |
|
match = re.search(pattern, result_text) |
|
if match: |
|
url = match.group(0).rstrip('\'",:;') |
|
logger.info(f"π― Found Gradio file URL: {url}") |
|
return url |
|
|
|
|
|
url_pattern = r'https?://[^\s"\'<>]+\.(?:mp3|wav|ogg|m4a|flac|aac|opus|wma|mp4|webm|avi|mov|mkv|m4v|wmv|png|jpg|jpeg|gif|webp|bmp|svg)' |
|
match = re.search(url_pattern, result_text, re.IGNORECASE) |
|
if match: |
|
url = match.group(0) |
|
logger.info(f"π― Found media URL by extension: {url}") |
|
return url |
|
|
|
|
|
if result_text.startswith('data:'): |
|
logger.info("π― Found data URL") |
|
return result_text |
|
|
|
logger.info("β No media URL found in result") |
|
return None |
|
|
|
def _get_native_system_prompt(self) -> str: |
|
"""Get system prompt for HF Inference without MCP servers""" |
|
model_info = AppConfig.AVAILABLE_MODELS.get(self.mcp_client.current_model, {}) |
|
context_length = model_info.get("context_length", 128000) |
|
|
|
return f"""You are an AI assistant powered by {self.mcp_client.current_model} via {self.mcp_client.current_provider}. You have native capabilities for: |
|
- **Text Processing**: You can analyze, summarize, translate, and process text directly |
|
- **General Knowledge**: You can answer questions, explain concepts, and have conversations |
|
- **Code Analysis**: You can read, analyze, and explain code |
|
- **Reasoning**: You can perform step-by-step reasoning and problem-solving |
|
- **Context Window**: You have access to {context_length:,} tokens of context |
|
Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} |
|
Please provide helpful, accurate, and engaging responses to user queries.""" |
|
|
|
def _get_mcp_system_prompt(self, uploaded_file_urls: List[str] = None) -> str: |
|
"""Get enhanced system prompt for HF Inference with MCP servers""" |
|
model_info = AppConfig.AVAILABLE_MODELS.get(self.mcp_client.current_model, {}) |
|
context_length = model_info.get("context_length", 128000) |
|
|
|
uploaded_files_context = "" |
|
if uploaded_file_urls: |
|
uploaded_files_context = f"\n\nFILES UPLOADED BY USER (Public URLs accessible to MCP servers):\n" |
|
for i, file_url in enumerate(uploaded_file_urls, 1): |
|
file_name = file_url.split('/')[-1] if '/' in file_url else file_url |
|
if AppConfig.is_image_file(file_url): |
|
file_type = "Image" |
|
elif AppConfig.is_audio_file(file_url): |
|
file_type = "Audio" |
|
elif AppConfig.is_video_file(file_url): |
|
file_type = "Video" |
|
else: |
|
file_type = "File" |
|
uploaded_files_context += f"{i}. {file_type}: {file_name}\n URL: {file_url}\n" |
|
|
|
|
|
enabled_servers = self.mcp_client.get_enabled_servers() |
|
tools_info = [] |
|
for server_name, config in enabled_servers.items(): |
|
tools_info.append(f"- **{server_name}**: {config.description}") |
|
|
|
return f"""You are an AI assistant powered by {self.mcp_client.current_model} via {self.mcp_client.current_provider}, with access to various MCP tools. |
|
YOUR NATIVE CAPABILITIES: |
|
- **Text Processing**: You can analyze, summarize, translate, and process text directly |
|
- **General Knowledge**: You can answer questions, explain concepts, and have conversations |
|
- **Code Analysis**: You can read, analyze, and explain code |
|
- **Reasoning**: You can perform step-by-step reasoning and problem-solving |
|
- **Context Window**: You have access to {context_length:,} tokens of context |
|
AVAILABLE MCP TOOLS: |
|
You have access to the following MCP servers: |
|
{chr(10).join(tools_info)} |
|
WHEN TO USE MCP TOOLS: |
|
- **Image Generation**: Creating new images from text prompts |
|
- **Image Editing**: Modifying, enhancing, or transforming existing images |
|
- **Audio Processing**: Transcribing audio, generating speech, audio enhancement |
|
- **Video Processing**: Creating or editing videos |
|
- **Text to Speech**: Converting text to audio |
|
- **Specialized Analysis**: Tasks requiring specific models or APIs |
|
TOOL USAGE FORMAT: |
|
When you need to use an MCP tool, respond with JSON in this exact format: |
|
{{"use_tool": true, "server": "exact_server_name", "tool": "exact_tool_name", "arguments": {{"param": "value"}}}} |
|
IMPORTANT: Always describe what you're going to do BEFORE the JSON tool call. For example: |
|
"I'll generate speech for your text using the TTS tool." |
|
{{"use_tool": true, "server": "text to speech", "tool": "Kokoro_TTS_mcp_test_generate_first", "arguments": {{"text": "hello"}}}} |
|
IMPORTANT TOOL NAME MAPPING: |
|
- For TTS server: use tool name "Kokoro_TTS_mcp_test_generate_first" |
|
- For image generation: use tool name "dalle_3_xl_lora_v2_generate" |
|
- For video generation: use tool name "ysharma_ltx_video_distilledtext_to_video" |
|
- For letter counting: use tool name "gradio_app_dummy1_letter_counter" |
|
EXACT SERVER NAMES TO USE: |
|
{', '.join([f'"{name}"' for name in enabled_servers.keys()])} |
|
FILE HANDLING FOR MCP TOOLS: |
|
When using MCP tools with uploaded files, always use the public URLs provided above. |
|
These URLs are accessible to remote MCP servers. |
|
{uploaded_files_context} |
|
MEDIA HANDLING: |
|
When tool results contain media URLs (images, audio, videos), the system will automatically embed them as playable media. |
|
IMPORTANT NOTES: |
|
- Always use the EXACT server names and tool names as specified above |
|
- Use proper JSON format for tool calls |
|
- Include all required parameters in arguments |
|
- For file inputs to MCP tools, use the public URLs provided, not local paths |
|
- ALWAYS provide a descriptive message before the JSON tool call |
|
- After tool execution, you can provide additional context or ask if the user needs anything else |
|
Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} |
|
Current model: {self.mcp_client.current_model} via {self.mcp_client.current_provider}""" |
|
|