import gradio as gr import psycopg2 from contextlib import closing from dotenv import load_dotenv load_dotenv() READ_ONLY_KEYWORDS = ("SELECT", "SHOW", "DESCRIBE", "DESC", "EXPLAIN") def get_connection(): """Create a connection to the database connection""" try: conn = psycopg2.connect("dbname='db' user='db' host='82.97.247.105' password='db'") return conn except Exception as e: return None, f"Error connecting to database: {e}" def is_read_only_query(query: str) -> bool: """check if a query is read-only by examining its first word""" if not query.strip(): return False first_word = query.strip().split()[0].upper() return first_word in READ_ONLY_KEYWORDS def list_tables(): """Get all tables from the database""" try: with closing(get_connection()) as conn: if conn is None: return "Error: Could not connect to database" cursor = conn.cursor() cursor.execute("SELECT * FROM pg_catalog.pg_tables WHERE schemaname NOT IN ('information_schema', 'pg_catalog');") tables = cursor.fetchall() if not tables: return "No user tables found in the database" # Format the output nicely result = "Tables in database:\n\n" for table in tables: result += f"Schema: {table[0]}, Table: {table[1]}\n" return result except Exception as e: return f"Error retrieving tables: {str(e)}" def execute_query(query: str): """Execute a read-only SQL query on the database""" if not query.strip(): return "Error: Please enter a query" if not is_read_only_query(query): return "Error: Only read-only queries (SELECT, SHOW, DESCRIBE, DESC, EXPLAIN) are allowed" try: with closing(get_connection()) as conn: if conn is None: return "Error: Could not connect to database" cursor = conn.cursor() cursor.execute(query) results = cursor.fetchall() if not results: return "Query executed successfully, but returned no results" # Get column names column_names = [desc[0] for desc in cursor.description] # Format results as a table result = f"Query: {query}\n\n" result += "Columns: " + " | ".join(column_names) + "\n" result += "-" * (len(" | ".join(column_names)) + 10) + "\n" for row in results: result += " | ".join(str(cell) for cell in row) + "\n" result += f"\nTotal rows: {len(results)}" return result except Exception as e: return f"Error executing query: {str(e)}" def get_table_schema(table_name: str): """Get schema information for a specific table""" if not table_name.strip(): return "Error: Please enter a table name" try: with closing(get_connection()) as conn: if conn is None: return "Error: Could not connect to database" cursor = conn.cursor() # Get table structure cursor.execute(""" SELECT column_name, data_type, is_nullable, column_default FROM information_schema.columns WHERE table_name = %s ORDER BY ordinal_position; """, (table_name,)) columns = cursor.fetchall() if not columns: return f"Table '{table_name}' not found or no columns found" result = f"Schema for table '{table_name}':\n\n" result += "Column Name | Data Type | Nullable | Default\n" result += "-" * 50 + "\n" for col in columns: result += f"{col[0]} | {col[1]} | {col[2]} | {col[3] or 'NULL'}\n" return result except Exception as e: return f"Error getting table schema: {str(e)}" def test_connection(): """Test database connection""" try: with closing(get_connection()) as conn: if conn is None: return "❌ Connection failed" cursor = conn.cursor() cursor.execute("SELECT version();") version = cursor.fetchone() return f"✅ Connection successful!\nDatabase version: {version[0]}" except Exception as e: return f"❌ Connection failed: {str(e)}" # Sample queries for demonstration sample_queries = [ "SELECT * FROM users LIMIT 5;", "SELECT COUNT(*) FROM users;", "SHOW TABLES;", "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public';", "DESCRIBE users;" ] # Create Gradio interface with gr.Blocks(title="PostgreSQL Database Explorer", theme=gr.themes.Soft()) as interface: gr.Markdown("# 🗄️ PostgreSQL Database Explorer") gr.Markdown("Secure interface for executing read-only queries to PostgreSQL database") with gr.Tab("🔍 Execute Query"): gr.Markdown("### Execute SQL Query") gr.Markdown("Only read-only queries are allowed: SELECT, SHOW, DESCRIBE, DESC, EXPLAIN") with gr.Row(): with gr.Column(scale=2): query_input = gr.Textbox( label="SQL Query", placeholder="Enter your SQL query here...", lines=5, value="SELECT * FROM users LIMIT 5;" ) with gr.Row(): execute_btn = gr.Button("🚀 Execute Query", variant="primary") clear_btn = gr.Button("🗑️ Clear") gr.Markdown("**Sample queries:**") sample_dropdown = gr.Dropdown( choices=sample_queries, label="Select a sample query", value=None ) with gr.Column(scale=3): query_output = gr.Textbox( label="Results", lines=15, max_lines=20, interactive=False ) with gr.Tab("📋 Tables List"): gr.Markdown("### Database Tables List") with gr.Row(): tables_btn = gr.Button("📋 Get Tables List", variant="primary") tables_output = gr.Textbox( label="Tables", lines=10, interactive=False ) with gr.Tab("🏗️ Table Schema"): gr.Markdown("### Table Structure Information") with gr.Row(): with gr.Column(scale=1): table_name_input = gr.Textbox( label="Table Name", placeholder="Enter table name...", value="users" ) schema_btn = gr.Button("🏗️ Get Schema", variant="primary") with gr.Column(scale=2): schema_output = gr.Textbox( label="Table Schema", lines=10, interactive=False ) with gr.Tab("🔧 Connection Test"): gr.Markdown("### Database Connection Test") with gr.Row(): test_btn = gr.Button("🔧 Test Connection", variant="primary") connection_output = gr.Textbox( label="Connection Status", lines=5, interactive=False ) # Event handlers execute_btn.click( fn=execute_query, inputs=[query_input], outputs=[query_output] ) clear_btn.click( fn=lambda: "", outputs=[query_input] ) sample_dropdown.change( fn=lambda x: x if x else "", inputs=[sample_dropdown], outputs=[query_input] ) tables_btn.click( fn=list_tables, outputs=[tables_output] ) schema_btn.click( fn=get_table_schema, inputs=[table_name_input], outputs=[schema_output] ) test_btn.click( fn=test_connection, outputs=[connection_output] ) # Launch the interface if __name__ == "__main__": interface.launch(mcp_server=True)