sql_mcp / app.py
iamdimadem's picture
app: file
f6887ea
import gradio as gr
import psycopg2
from contextlib import closing
from dotenv import load_dotenv
load_dotenv()
READ_ONLY_KEYWORDS = ("SELECT", "SHOW", "DESCRIBE", "DESC", "EXPLAIN")
def get_connection():
"""Create a connection to the database connection"""
try:
conn = psycopg2.connect("dbname='db' user='db' host='82.97.247.105' password='db'")
return conn
except Exception as e:
return None, f"Error connecting to database: {e}"
def is_read_only_query(query: str) -> bool:
"""check if a query is read-only by examining its first word"""
if not query.strip():
return False
first_word = query.strip().split()[0].upper()
return first_word in READ_ONLY_KEYWORDS
def list_tables():
"""Get all tables from the database"""
try:
with closing(get_connection()) as conn:
if conn is None:
return "Error: Could not connect to database"
cursor = conn.cursor()
cursor.execute("SELECT * FROM pg_catalog.pg_tables WHERE schemaname NOT IN ('information_schema', 'pg_catalog');")
tables = cursor.fetchall()
if not tables:
return "No user tables found in the database"
# Format the output nicely
result = "Tables in database:\n\n"
for table in tables:
result += f"Schema: {table[0]}, Table: {table[1]}\n"
return result
except Exception as e:
return f"Error retrieving tables: {str(e)}"
def execute_query(query: str):
"""Execute a read-only SQL query on the database"""
if not query.strip():
return "Error: Please enter a query"
if not is_read_only_query(query):
return "Error: Only read-only queries (SELECT, SHOW, DESCRIBE, DESC, EXPLAIN) are allowed"
try:
with closing(get_connection()) as conn:
if conn is None:
return "Error: Could not connect to database"
cursor = conn.cursor()
cursor.execute(query)
results = cursor.fetchall()
if not results:
return "Query executed successfully, but returned no results"
# Get column names
column_names = [desc[0] for desc in cursor.description]
# Format results as a table
result = f"Query: {query}\n\n"
result += "Columns: " + " | ".join(column_names) + "\n"
result += "-" * (len(" | ".join(column_names)) + 10) + "\n"
for row in results:
result += " | ".join(str(cell) for cell in row) + "\n"
result += f"\nTotal rows: {len(results)}"
return result
except Exception as e:
return f"Error executing query: {str(e)}"
def get_table_schema(table_name: str):
"""Get schema information for a specific table"""
if not table_name.strip():
return "Error: Please enter a table name"
try:
with closing(get_connection()) as conn:
if conn is None:
return "Error: Could not connect to database"
cursor = conn.cursor()
# Get table structure
cursor.execute("""
SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_name = %s
ORDER BY ordinal_position;
""", (table_name,))
columns = cursor.fetchall()
if not columns:
return f"Table '{table_name}' not found or no columns found"
result = f"Schema for table '{table_name}':\n\n"
result += "Column Name | Data Type | Nullable | Default\n"
result += "-" * 50 + "\n"
for col in columns:
result += f"{col[0]} | {col[1]} | {col[2]} | {col[3] or 'NULL'}\n"
return result
except Exception as e:
return f"Error getting table schema: {str(e)}"
def test_connection():
"""Test database connection"""
try:
with closing(get_connection()) as conn:
if conn is None:
return "❌ Connection failed"
cursor = conn.cursor()
cursor.execute("SELECT version();")
version = cursor.fetchone()
return f"βœ… Connection successful!\nDatabase version: {version[0]}"
except Exception as e:
return f"❌ Connection failed: {str(e)}"
# Sample queries for demonstration
sample_queries = [
"SELECT * FROM users LIMIT 5;",
"SELECT COUNT(*) FROM users;",
"SHOW TABLES;",
"SELECT table_name FROM information_schema.tables WHERE table_schema = 'public';",
"DESCRIBE users;"
]
# Create Gradio interface
with gr.Blocks(title="PostgreSQL Database Explorer", theme=gr.themes.Soft()) as interface:
gr.Markdown("# πŸ—„οΈ PostgreSQL Database Explorer")
gr.Markdown("Secure interface for executing read-only queries to PostgreSQL database")
with gr.Tab("πŸ” Execute Query"):
gr.Markdown("### Execute SQL Query")
gr.Markdown("Only read-only queries are allowed: SELECT, SHOW, DESCRIBE, DESC, EXPLAIN")
with gr.Row():
with gr.Column(scale=2):
query_input = gr.Textbox(
label="SQL Query",
placeholder="Enter your SQL query here...",
lines=5,
value="SELECT * FROM users LIMIT 5;"
)
with gr.Row():
execute_btn = gr.Button("πŸš€ Execute Query", variant="primary")
clear_btn = gr.Button("πŸ—‘οΈ Clear")
gr.Markdown("**Sample queries:**")
sample_dropdown = gr.Dropdown(
choices=sample_queries,
label="Select a sample query",
value=None
)
with gr.Column(scale=3):
query_output = gr.Textbox(
label="Results",
lines=15,
max_lines=20,
interactive=False
)
with gr.Tab("πŸ“‹ Tables List"):
gr.Markdown("### Database Tables List")
with gr.Row():
tables_btn = gr.Button("πŸ“‹ Get Tables List", variant="primary")
tables_output = gr.Textbox(
label="Tables",
lines=10,
interactive=False
)
with gr.Tab("πŸ—οΈ Table Schema"):
gr.Markdown("### Table Structure Information")
with gr.Row():
with gr.Column(scale=1):
table_name_input = gr.Textbox(
label="Table Name",
placeholder="Enter table name...",
value="users"
)
schema_btn = gr.Button("πŸ—οΈ Get Schema", variant="primary")
with gr.Column(scale=2):
schema_output = gr.Textbox(
label="Table Schema",
lines=10,
interactive=False
)
with gr.Tab("πŸ”§ Connection Test"):
gr.Markdown("### Database Connection Test")
with gr.Row():
test_btn = gr.Button("πŸ”§ Test Connection", variant="primary")
connection_output = gr.Textbox(
label="Connection Status",
lines=5,
interactive=False
)
# Event handlers
execute_btn.click(
fn=execute_query,
inputs=[query_input],
outputs=[query_output]
)
clear_btn.click(
fn=lambda: "",
outputs=[query_input]
)
sample_dropdown.change(
fn=lambda x: x if x else "",
inputs=[sample_dropdown],
outputs=[query_input]
)
tables_btn.click(
fn=list_tables,
outputs=[tables_output]
)
schema_btn.click(
fn=get_table_schema,
inputs=[table_name_input],
outputs=[schema_output]
)
test_btn.click(
fn=test_connection,
outputs=[connection_output]
)
# Launch the interface
if __name__ == "__main__":
interface.launch(mcp_server=True)