import sqlite3 from pinecone import Pinecone import json from typing import List, Dict, Any import os import requests from dotenv import load_dotenv load_dotenv() from sql.sql_utils import load_sql_query DB_PATH = '/data/huggingface_spaces.db' if os.path.exists('/data') else 'huggingface_spaces.db' PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") TOOLS_INDEX_NAME = "vix-mcp-tools" SPACES_INDEX_NAME = "vix-mcp-spaces" SQL_SELECT_TOOLS = "sql/select_tools.sql" SQL_SELECT_SPACES = "sql/select_spaces.sql" import time def create_tools_index(pc: Pinecone): """Create Pinecone index for tools if it doesn't exist""" if not pc.has_index(TOOLS_INDEX_NAME): print(f"Creating new index: {TOOLS_INDEX_NAME}") pc.create_index_for_model( name=TOOLS_INDEX_NAME, cloud="aws", region="us-east-1", embed={ "model": "llama-text-embed-v2", "field_map": { "text": "description" } } ) time.sleep(5) # Wait for index to be ready def create_spaces_index(pc: Pinecone): """Create Pinecone index for MCP spaces if it doesn't exist""" if not pc.has_index(SPACES_INDEX_NAME): print(f"Creating new index: {SPACES_INDEX_NAME}") pc.create_index_for_model( name=SPACES_INDEX_NAME, cloud="aws", region="us-east-1", embed={ "model": "llama-text-embed-v2", "field_map": { "text": "profile" } } ) time.sleep(5) # Wait for index to be ready def fetch_space_schema(space_url: str) -> Dict[str, Any]: """Fetch complete schema from MCP space""" schema_url = f"{space_url}/gradio_api/mcp/schema" try: response = requests.get(schema_url, timeout=10) response.raise_for_status() return response.json() except Exception as e: print(f"Error fetching schema from {schema_url}: {e}") return {} def prepare_space_profile(space: Dict[str, Any]) -> str: """Create a comprehensive description of space capabilities""" descriptions = [] # Concatenate space metadata descriptions.append(space['title']) if space['description']: descriptions.append(f"(* {space['description']} *)") if space['tags']: descriptions.append(f"[ {space['tags']} ]") # Add raw schema if available if space.get('schema_url'): try: response = requests.get(space['schema_url'], timeout=10) schema = response.json() descriptions.append(f"< {json.dumps(schema)} >") except Exception as e: print(f"Error fetching schema from {space['schema_url']}: {e}") return "\t".join(descriptions) def load_spaces_from_db() -> List[Dict[str, Any]]: """Load spaces with their tools count from database""" query = load_sql_query(SQL_SELECT_SPACES) with sqlite3.connect(DB_PATH) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(query) return [dict(row) for row in cursor.fetchall()] def upsert_spaces_to_pinecone(pc: Pinecone, spaces: List[Dict[str, Any]]): """Upload MCP spaces to Pinecone index""" index = pc.Index(SPACES_INDEX_NAME) records = [] for space in spaces: profile = prepare_space_profile(space) record = { "_id": space['space_id'], "profile": profile if profile else "", "title": space['title'] if space['title'] else "", "url": space['schema_url'] if space['schema_url'] else "", "tool_count": space['tool_count'] if space['tool_count'] else 0, "tags": space['tags'] if space['tags'] else '[]' } records.append(record) while records: batch = records[:96] records = records[96:] index.upsert_records("spaces", batch) time.sleep(1) print(f"Uploaded {len(spaces)} spaces") def search_spaces(pc: Pinecone, query: str, top_k: int = 5, score_threshold: float = 0.0) -> List[Dict[str, Any]]: """Search for relevant MCP spaces based on their description and tools""" index = pc.Index(SPACES_INDEX_NAME) results = index.search( namespace="spaces", query={ "top_k": top_k, "inputs": { 'text': query } } ) spaces_list = [] for hit in results['result']['hits']: score = hit.get("_score", 0) if score > score_threshold: fields = hit.get('fields', {}) space = { "title": fields.get("title"), "url": fields.get("url"), "tool_count": fields.get("tool_count"), "tags": fields.get("tags"), "score": score } spaces_list.append(space) # Sort by score in descending order spaces_list.sort(key=lambda x: x["score"], reverse=True) return spaces_list def load_tools_from_db() -> List[Dict[str, Any]]: """Load tools from SQLite database""" query = load_sql_query(SQL_SELECT_TOOLS) with sqlite3.connect(DB_PATH) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(query) return [dict(row) for row in cursor.fetchall()] def upsert_tools_to_pinecone(pc: Pinecone, tools: List[Dict[str, Any]]): """Upload tools to Pinecone index""" index = pc.Index(TOOLS_INDEX_NAME) records = [] for tool in tools: record = { "_id": f"{tool['space_id']}_{tool['tool_name']}", "description": tool['description'] if tool['description'] else "<{NO DESCRIPTION}>", "space_id": tool['space_id'], "tool_name": tool['tool_name'], "input_schema": json.dumps(json.loads(tool['input_schema']) if tool['input_schema'] else {}), "server_url": tool['server_url'] } records.append(record) while records: batch = records[:96] records = records[96:] index.upsert_records("tools", batch) time.sleep(1) print(f"Uploaded {len(tools)} tools") def search_tools(pc: Pinecone, query: str, top_k: int = 5, score_threshold: float = 0.0) -> List[Dict[str, Any]]: """Search for relevant tools based on description""" index = pc.Index(TOOLS_INDEX_NAME) results = index.search( namespace="tools", query={ "top_k": top_k, "inputs": { 'text': query } } ) tools_list = [] for hit in results['result']['hits']: score = hit.get("_score", 0) if score > score_threshold: fields = hit.get('fields', {}) tool = { "name": fields.get("tool_name"), "description": fields.get("description"), "inputSchema": json.loads(fields.get("input_schema", "{}")), "server_url": fields.get("server_url"), "score": score } tools_list.append(tool) # Sort by score in descending order tools_list.sort(key=lambda x: x["score"], reverse=True) return tools_list def search_suitable_tools(query: str)->List[Dict[str, Any]]: """Search for suitable tools based on query""" pc = Pinecone(api_key=PINECONE_API_KEY) tools = search_tools(pc, query, top_k=13, score_threshold=0.25) return tools def search_suitable_spaces(query: str)->List[Dict[str, Any]]: """Search for suitable spaces based on query""" pc = Pinecone(api_key=PINECONE_API_KEY) spaces = search_spaces(pc, query, top_k=3, score_threshold=0.1) return spaces def initialize_and_upload_to_vector_db(): """Initialize Pinecone and upload all tools and spaces""" pc = Pinecone(api_key=PINECONE_API_KEY) create_tools_index(pc) create_spaces_index(pc) print("Loading and uploading tools...") tools = load_tools_from_db() print(f"Loaded {len(tools)} tools from database") upsert_tools_to_pinecone(pc, tools) print("\nLoading and uploading spaces...") spaces = load_spaces_from_db() print(f"Loaded {len(spaces)} spaces from database") upsert_spaces_to_pinecone(pc, spaces) print("Upload complete!") return pc if __name__ == "__main__": pc = initialize_and_upload_to_vector_db() # Interactive search loop while query := input("Enter a query (or 'exit'/'quit' to stop): "): if query.lower() in ["exit", "quit"]: break relevant_tools = search_tools(pc, query) if relevant_tools: print("\nFound tools:") print(json.dumps(relevant_tools, indent=2)) else: print("\nNo relevant tools found.") relevant_spaces = search_spaces(pc, query) if relevant_spaces: print("\nFound spaces:") print(json.dumps(relevant_spaces, indent=2)) else: print("\nNo relevant spaces found.") else: print("The End.")