import gradio as gr from huggingface_hub import InferenceClient import json import asyncio import subprocess import os import psycopg2 from typing import Optional, Dict, Any, List class MCPContextManager: """Model Context Protocol Manager for handling context and tools""" def __init__(self): self.context = [] self.tools = {} self.resources = {} self.db_connection = None def add_context(self, role: str, content: str, metadata: Optional[Dict] = None): """Add context entry following MCP specification""" entry = { "role": role, "content": content, "timestamp": None, "metadata": metadata or {} } self.context.append(entry) return entry def register_tool(self, name: str, description: str, parameters: Dict, handler): """Register a tool following MCP tool specification""" self.tools[name] = { "name": name, "description": description, "parameters": parameters, "handler": handler } def register_resource(self, uri: str, name: str, mime_type: str, content: Any): """Register a resource following MCP resource specification""" self.resources[uri] = { "uri": uri, "name": name, "mimeType": mime_type, "content": content } def connect_postgres(self, connection_string: str): """Connect to PostgreSQL database""" try: self.db_connection = psycopg2.connect(connection_string) return {"success": True, "message": "Connected to PostgreSQL"} except Exception as e: return {"success": False, "error": str(e)} def get_context_window(self, max_tokens: int = 4096) -> List[Dict]: """Get context window within token limits""" return self.context[-10:] async def call_tool(self, tool_name: str, arguments: Dict) -> Any: """Execute a registered tool""" if tool_name not in self.tools: raise ValueError(f"Tool {tool_name} not found") tool = self.tools[tool_name] return await tool["handler"](arguments) def get_available_tools(self) -> List[Dict]: """Get list of available tools in MCP format""" return [ { "name": tool["name"], "description": tool["description"], "parameters": tool["parameters"] } for tool in self.tools.values() ] def export_context(self) -> str: """Export context in MCP-compatible JSON format""" return json.dumps({ "context": self.context, "tools": self.get_available_tools(), "resources": list(self.resources.keys()) }, indent=2) # Initialize MCP Manager mcp_manager = MCPContextManager() # PostgreSQL Connection String (from your config) POSTGRES_CONNECTION_STRING = "postgresql://neondb_owner:npg_oGg8yphr6FeZ@ep-summer-art-a1jpcb05-pooler.ap-southeast-1.aws.neon.tech/neondb?sslmode=require" # MCP Tool Handlers async def calculator_handler(args): """Calculator tool handler""" operation = args.get("operation") a = float(args.get("a", 0)) b = float(args.get("b", 0)) operations = { "add": a + b, "subtract": a - b, "multiply": a * b, "divide": a / b if b != 0 else "Error: Division by zero" } return operations.get(operation, "Invalid operation") async def memory_handler(args): """Memory/context storage handler""" action = args.get("action") key = args.get("key") value = args.get("value") if action == "store": mcp_manager.register_resource( uri=f"memory://{key}", name=key, mime_type="text/plain", content=value ) return f"Stored {key}" elif action == "retrieve": resource = mcp_manager.resources.get(f"memory://{key}") return resource["content"] if resource else "Not found" return "Invalid action" async def web_search_handler(args): """Web search handler""" query = args.get("query") return f"Search results for: {query} (Integrated search coming soon)" async def git_command_handler(args): """Git command execution handler""" command = args.get("command", "") repo_path = args.get("repo_path", r"C:\Users\CHAN\Documents\PROJECTS\PROJECTS\MODEL\SQL") try: # Security: Only allow specific safe git commands allowed_commands = ["status", "log", "branch", "diff", "add", "commit", "push", "pull", "fetch"] cmd_parts = command.split() if not cmd_parts: return {"error": "No command provided"} # Check if first word is an allowed command base_cmd = cmd_parts[0] if base_cmd not in allowed_commands: return {"error": f"Command '{base_cmd}' not allowed. Allowed: {', '.join(allowed_commands)}"} # Execute git command result = subprocess.run( ["git"] + cmd_parts, cwd=repo_path, capture_output=True, text=True, timeout=30 ) return { "stdout": result.stdout, "stderr": result.stderr, "returncode": result.returncode, "success": result.returncode == 0, "command": f"git {command}", "repo": repo_path } except subprocess.TimeoutExpired: return {"error": "Command timed out (30s limit)"} except FileNotFoundError: return {"error": "Git not found. Make sure git is installed."} except Exception as e: return {"error": str(e)} async def postgres_query_handler(args): """PostgreSQL query handler - Direct connection to Neon DB""" query = args.get("query", "") if not query: return {"error": "No query provided"} try: # Connect to database conn = psycopg2.connect(POSTGRES_CONNECTION_STRING) cursor = conn.cursor() # Execute query cursor.execute(query) # Check if it's a SELECT query if query.strip().upper().startswith("SELECT"): results = cursor.fetchall() columns = [desc[0] for desc in cursor.description] return { "success": True, "columns": columns, "rows": results, "row_count": len(results), "data": [dict(zip(columns, row)) for row in results] } else: # For INSERT, UPDATE, DELETE, etc. conn.commit() return { "success": True, "message": "Query executed successfully", "rows_affected": cursor.rowcount } except psycopg2.Error as e: return { "success": False, "error": str(e), "error_type": "PostgreSQL Error" } except Exception as e: return { "success": False, "error": str(e), "error_type": "General Error" } finally: if 'cursor' in locals(): cursor.close() if 'conn' in locals(): conn.close() async def filesystem_handler(args): """Filesystem operations handler""" operation = args.get("operation") path = args.get("path", "") content = args.get("content", "") base_path = r"C:\Users\CHAN\Documents\PROJECTS" # Security: Ensure path is within base_path if path: full_path = os.path.normpath(os.path.join(base_path, path)) if not full_path.startswith(base_path): return {"error": "Access denied: Path outside allowed directory", "success": False} else: full_path = base_path try: if operation == "read": if os.path.exists(full_path) and os.path.isfile(full_path): with open(full_path, 'r', encoding='utf-8') as f: return {"content": f.read(), "path": full_path, "success": True} return {"error": "File not found", "success": False} elif operation == "list": if os.path.exists(full_path) and os.path.isdir(full_path): items = [] for item in os.listdir(full_path): item_path = os.path.join(full_path, item) items.append({ "name": item, "type": "directory" if os.path.isdir(item_path) else "file", "size": os.path.getsize(item_path) if os.path.isfile(item_path) else None }) return {"items": items, "count": len(items), "path": full_path, "success": True} return {"error": "Directory not found", "success": False} elif operation == "write": with open(full_path, 'w', encoding='utf-8') as f: f.write(content) return {"message": "File written successfully", "path": full_path, "success": True} elif operation == "search": pattern = args.get("pattern", "") results = [] search_path = full_path if os.path.isdir(full_path) else base_path for root, dirs, files in os.walk(search_path): for file in files: if pattern.lower() in file.lower(): results.append(os.path.join(root, file)) if len(results) >= 50: # Limit results break return {"results": results, "count": len(results), "success": True} return {"error": "Invalid operation. Use: read, write, list, or search", "success": False} except PermissionError: return {"error": "Permission denied", "success": False} except Exception as e: return {"error": str(e), "success": False} # Register all tools with MCP manager mcp_manager.register_tool( name="calculator", description="Perform basic arithmetic operations (add, subtract, multiply, divide)", parameters={ "type": "object", "properties": { "operation": {"type": "string", "enum": ["add", "subtract", "multiply", "divide"]}, "a": {"type": "number", "description": "First number"}, "b": {"type": "number", "description": "Second number"} }, "required": ["operation", "a", "b"] }, handler=calculator_handler ) mcp_manager.register_tool( name="memory", description="Store and retrieve information in session memory", parameters={ "type": "object", "properties": { "action": {"type": "string", "enum": ["store", "retrieve"]}, "key": {"type": "string", "description": "Memory key"}, "value": {"type": "string", "description": "Value to store (required for 'store')"} }, "required": ["action", "key"] }, handler=memory_handler ) mcp_manager.register_tool( name="web_search", description="Search the web for information", parameters={ "type": "object", "properties": { "query": {"type": "string", "description": "Search query"} }, "required": ["query"] }, handler=web_search_handler ) mcp_manager.register_tool( name="git", description="Execute git commands (status, log, commit, push, pull, add, branch, diff, fetch). Example: 'status' or 'commit -am \"message\"'", parameters={ "type": "object", "properties": { "command": {"type": "string", "description": "Git command (without 'git' prefix)"}, "repo_path": {"type": "string", "description": "Repository path (optional, defaults to MODEL/SQL)"} }, "required": ["command"] }, handler=git_command_handler ) mcp_manager.register_tool( name="postgres", description="Execute PostgreSQL queries on Neon database. Supports SELECT, INSERT, UPDATE, DELETE, CREATE, ALTER, etc.", parameters={ "type": "object", "properties": { "query": {"type": "string", "description": "SQL query to execute"} }, "required": ["query"] }, handler=postgres_query_handler ) mcp_manager.register_tool( name="filesystem", description="Perform filesystem operations within C:\\Users\\CHAN\\Documents\\PROJECTS", parameters={ "type": "object", "properties": { "operation": {"type": "string", "enum": ["read", "write", "list", "search"], "description": "Operation to perform"}, "path": {"type": "string", "description": "Relative path from PROJECTS folder"}, "content": {"type": "string", "description": "Content for write operation"}, "pattern": {"type": "string", "description": "Search pattern (for search operation)"} }, "required": ["operation", "path"] }, handler=filesystem_handler ) def respond( message, history: list[dict[str, str]], system_message, max_tokens, temperature, top_p, enable_mcp, hf_token: gr.OAuthToken, ): """Enhanced respond function with MCP support""" if enable_mcp: mcp_manager.add_context("user", message) client = InferenceClient(token=hf_token.token, model="openai/gpt-oss-20b") messages = [{"role": "system", "content": system_message}] if enable_mcp: tools_info = "\n\nAvailable MCP Tools:\n" + json.dumps(mcp_manager.get_available_tools(), indent=2) messages[0]["content"] += tools_info messages.extend(history) messages.append({"role": "user", "content": message}) response = "" for msg in client.chat_completion( messages, max_tokens=max_tokens, stream=True, temperature=temperature, top_p=top_p, ): choices = msg.choices token = "" if len(choices) and choices[0].delta.content: token = choices[0].delta.content response += token yield response if enable_mcp: mcp_manager.add_context("assistant", response) def call_mcp_tool(tool_name: str, arguments_json: str): """Interface for calling MCP tools from UI""" try: arguments = json.loads(arguments_json) if arguments_json.strip() else {} result = asyncio.run(mcp_manager.call_tool(tool_name, arguments)) return json.dumps({"success": True, "result": result}, indent=2) except json.JSONDecodeError as e: return json.dumps({"success": False, "error": f"Invalid JSON: {str(e)}"}, indent=2) except Exception as e: return json.dumps({"success": False, "error": str(e)}, indent=2) def export_mcp_context(): """Export current MCP context""" return mcp_manager.export_context() def get_mcp_tools_list(): """Get formatted list of available MCP tools""" tools = mcp_manager.get_available_tools() return json.dumps(tools, indent=2) def test_postgres_connection(): """Test PostgreSQL connection""" try: conn = psycopg2.connect(POSTGRES_CONNECTION_STRING) cursor = conn.cursor() cursor.execute("SELECT version();") version = cursor.fetchone()[0] cursor.close() conn.close() return json.dumps({"success": True, "message": "Connected!", "version": version}, indent=2) except Exception as e: return json.dumps({"success": False, "error": str(e)}, indent=2) # Gradio Interface chatbot = gr.ChatInterface( respond, type="messages", additional_inputs=[ gr.Textbox( value="You are a powerful AI assistant with MCP tools: Git, PostgreSQL (Neon), Filesystem, Calculator, Memory, Web Search.", label="System message", lines=3 ), gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"), gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"), gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p"), gr.Checkbox(value=True, label="Enable MCP"), ], ) with gr.Blocks(title="AI Chat with Full MCP") as demo: gr.Markdown("# 🤖 AI Chatbot with Full MCP Support") gr.Markdown("✨ PostgreSQL (Neon) â€ĸ Git â€ĸ Filesystem â€ĸ Calculator â€ĸ Memory â€ĸ Web Search") with gr.Tabs(): with gr.Tab("đŸ’Ŧ Chat"): with gr.Sidebar(): gr.LoginButton() chatbot.render() with gr.Tab("đŸ› ī¸ MCP Tools"): gr.Markdown("### đŸŽ¯ Quick Actions") with gr.Row(): with gr.Column(): gr.Markdown("**🔧 Git Commands**") git_cmd = gr.Textbox(label="Command", placeholder="status", value="status") git_btn = gr.Button("Execute Git", variant="primary") with gr.Column(): gr.Markdown("**📁 Filesystem**") fs_operation = gr.Dropdown(["list", "read", "search"], label="Operation", value="list") fs_path = gr.Textbox(label="Path", placeholder="PROJECTS", value="") fs_btn = gr.Button("Execute FS", variant="primary") with gr.Row(): with gr.Column(): gr.Markdown("**đŸ—„ī¸ PostgreSQL**") pg_query = gr.Textbox(label="SQL Query", placeholder="SELECT * FROM users LIMIT 5", lines=2) pg_btn = gr.Button("Execute SQL", variant="primary") with gr.Column(): gr.Markdown("**🔌 Test Connection**") test_pg_btn = gr.Button("Test PostgreSQL Connection") tool_result = gr.Code(language="json", label="📊 Result", lines=15) gr.Markdown("---") gr.Markdown("### 🔧 Custom Tool Call") gr.Markdown("**Available Tools:** calculator, memory, web_search, git, postgres, filesystem") with gr.Row(): tool_name_input = gr.Textbox(label="Tool Name", placeholder="calculator", value="calculator") tool_args_input = gr.Code( label="Arguments (JSON)", value='{"operation": "add", "a": 10, "b": 5}', language="json", lines=5 ) call_tool_btn = gr.Button("â–ļī¸ Call Tool", variant="secondary") # Event handlers git_btn.click( fn=lambda cmd: call_mcp_tool("git", json.dumps({"command": cmd})), inputs=[git_cmd], outputs=tool_result ) fs_btn.click( fn=lambda op, path: call_mcp_tool("filesystem", json.dumps({"operation": op, "path": path})), inputs=[fs_operation, fs_path], outputs=tool_result ) pg_btn.click( fn=lambda query: call_mcp_tool("postgres", json.dumps({"query": query})), inputs=[pg_query], outputs=tool_result ) test_pg_btn.click( fn=test_postgres_connection, outputs=tool_result ) call_tool_btn.click( fn=call_mcp_tool, inputs=[tool_name_input, tool_args_input], outputs=tool_result ) with gr.Tab("📋 MCP Context"): gr.Markdown("### Export MCP Context") export_btn = gr.Button("đŸ“Ĩ Export Context") context_display = gr.Code(language="json", label="MCP Context", lines=15) export_btn.click(fn=export_mcp_context, outputs=context_display) with gr.Tab("â„šī¸ About"): gr.Markdown(""" ### Model Context Protocol (MCP) - Full Integration #### đŸ—„ī¸ Database: PostgreSQL (Neon) - **Host:** ep-summer-art-a1jpcb05-pooler.ap-southeast-1.aws.neon.tech - **Database:** neondb - **Direct Connection:** ✅ Active #### 📁 Filesystem - **Root:** C:\\Users\\CHAN\\Documents\\PROJECTS - **Operations:** read, write, list, search #### 🔧 Tools (6): 1. **Git** - Version control operations 2. **PostgreSQL** - Database queries 3. **Filesystem** - File operations 4. **Calculator** - Math operations 5. **Memory** - Session storage 6. **Web Search** - Information retrieval #### 📖 Examples: **Git Status:** ```json {"command": "status"} ``` **Git Commit & Push:** ```json {"command": "commit -am 'Update'"} ``` Then: ```json {"command": "push"} ``` **PostgreSQL Query:** ```json {"query": "SELECT * FROM information_schema.tables LIMIT 5"} ``` **List Files:** ```json {"operation": "list", "path": "PROJECTS"} ``` """) if __name__ == "__main__": demo.launch()