|
import gradio as gr |
|
import psycopg2 |
|
from contextlib import closing |
|
from dotenv import load_dotenv |
|
|
|
load_dotenv() |
|
|
|
READ_ONLY_KEYWORDS = ("SELECT", "SHOW", "DESCRIBE", "DESC", "EXPLAIN") |
|
|
|
|
|
def get_connection(): |
|
"""Create a connection to the database connection""" |
|
try: |
|
conn = psycopg2.connect("dbname='db' user='db' host='82.97.247.105' password='db'") |
|
return conn |
|
except Exception as e: |
|
return None, f"Error connecting to database: {e}" |
|
|
|
|
|
def is_read_only_query(query: str) -> bool: |
|
"""check if a query is read-only by examining its first word""" |
|
if not query.strip(): |
|
return False |
|
first_word = query.strip().split()[0].upper() |
|
return first_word in READ_ONLY_KEYWORDS |
|
|
|
|
|
def list_tables(): |
|
"""Get all tables from the database""" |
|
try: |
|
with closing(get_connection()) as conn: |
|
if conn is None: |
|
return "Error: Could not connect to database" |
|
cursor = conn.cursor() |
|
cursor.execute("SELECT * FROM pg_catalog.pg_tables WHERE schemaname NOT IN ('information_schema', 'pg_catalog');") |
|
tables = cursor.fetchall() |
|
|
|
if not tables: |
|
return "No user tables found in the database" |
|
|
|
|
|
result = "Tables in database:\n\n" |
|
for table in tables: |
|
result += f"Schema: {table[0]}, Table: {table[1]}\n" |
|
return result |
|
except Exception as e: |
|
return f"Error retrieving tables: {str(e)}" |
|
|
|
|
|
def execute_query(query: str): |
|
"""Execute a read-only SQL query on the database""" |
|
if not query.strip(): |
|
return "Error: Please enter a query" |
|
|
|
if not is_read_only_query(query): |
|
return "Error: Only read-only queries (SELECT, SHOW, DESCRIBE, DESC, EXPLAIN) are allowed" |
|
|
|
try: |
|
with closing(get_connection()) as conn: |
|
if conn is None: |
|
return "Error: Could not connect to database" |
|
cursor = conn.cursor() |
|
cursor.execute(query) |
|
results = cursor.fetchall() |
|
|
|
if not results: |
|
return "Query executed successfully, but returned no results" |
|
|
|
|
|
column_names = [desc[0] for desc in cursor.description] |
|
|
|
|
|
result = f"Query: {query}\n\n" |
|
result += "Columns: " + " | ".join(column_names) + "\n" |
|
result += "-" * (len(" | ".join(column_names)) + 10) + "\n" |
|
|
|
for row in results: |
|
result += " | ".join(str(cell) for cell in row) + "\n" |
|
|
|
result += f"\nTotal rows: {len(results)}" |
|
return result |
|
except Exception as e: |
|
return f"Error executing query: {str(e)}" |
|
|
|
|
|
def get_table_schema(table_name: str): |
|
"""Get schema information for a specific table""" |
|
if not table_name.strip(): |
|
return "Error: Please enter a table name" |
|
|
|
try: |
|
with closing(get_connection()) as conn: |
|
if conn is None: |
|
return "Error: Could not connect to database" |
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute(""" |
|
SELECT column_name, data_type, is_nullable, column_default |
|
FROM information_schema.columns |
|
WHERE table_name = %s |
|
ORDER BY ordinal_position; |
|
""", (table_name,)) |
|
|
|
columns = cursor.fetchall() |
|
|
|
if not columns: |
|
return f"Table '{table_name}' not found or no columns found" |
|
|
|
result = f"Schema for table '{table_name}':\n\n" |
|
result += "Column Name | Data Type | Nullable | Default\n" |
|
result += "-" * 50 + "\n" |
|
|
|
for col in columns: |
|
result += f"{col[0]} | {col[1]} | {col[2]} | {col[3] or 'NULL'}\n" |
|
|
|
return result |
|
except Exception as e: |
|
return f"Error getting table schema: {str(e)}" |
|
|
|
|
|
def test_connection(): |
|
"""Test database connection""" |
|
try: |
|
with closing(get_connection()) as conn: |
|
if conn is None: |
|
return "β Connection failed" |
|
cursor = conn.cursor() |
|
cursor.execute("SELECT version();") |
|
version = cursor.fetchone() |
|
return f"β
Connection successful!\nDatabase version: {version[0]}" |
|
except Exception as e: |
|
return f"β Connection failed: {str(e)}" |
|
|
|
|
|
|
|
sample_queries = [ |
|
"SELECT * FROM users LIMIT 5;", |
|
"SELECT COUNT(*) FROM users;", |
|
"SHOW TABLES;", |
|
"SELECT table_name FROM information_schema.tables WHERE table_schema = 'public';", |
|
"DESCRIBE users;" |
|
] |
|
|
|
|
|
with gr.Blocks(title="PostgreSQL Database Explorer", theme=gr.themes.Soft()) as interface: |
|
gr.Markdown("# ποΈ PostgreSQL Database Explorer") |
|
gr.Markdown("Secure interface for executing read-only queries to PostgreSQL database") |
|
|
|
with gr.Tab("π Execute Query"): |
|
gr.Markdown("### Execute SQL Query") |
|
gr.Markdown("Only read-only queries are allowed: SELECT, SHOW, DESCRIBE, DESC, EXPLAIN") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=2): |
|
query_input = gr.Textbox( |
|
label="SQL Query", |
|
placeholder="Enter your SQL query here...", |
|
lines=5, |
|
value="SELECT * FROM users LIMIT 5;" |
|
) |
|
|
|
with gr.Row(): |
|
execute_btn = gr.Button("π Execute Query", variant="primary") |
|
clear_btn = gr.Button("ποΈ Clear") |
|
|
|
gr.Markdown("**Sample queries:**") |
|
sample_dropdown = gr.Dropdown( |
|
choices=sample_queries, |
|
label="Select a sample query", |
|
value=None |
|
) |
|
|
|
with gr.Column(scale=3): |
|
query_output = gr.Textbox( |
|
label="Results", |
|
lines=15, |
|
max_lines=20, |
|
interactive=False |
|
) |
|
|
|
with gr.Tab("π Tables List"): |
|
gr.Markdown("### Database Tables List") |
|
with gr.Row(): |
|
tables_btn = gr.Button("π Get Tables List", variant="primary") |
|
tables_output = gr.Textbox( |
|
label="Tables", |
|
lines=10, |
|
interactive=False |
|
) |
|
|
|
with gr.Tab("ποΈ Table Schema"): |
|
gr.Markdown("### Table Structure Information") |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
table_name_input = gr.Textbox( |
|
label="Table Name", |
|
placeholder="Enter table name...", |
|
value="users" |
|
) |
|
schema_btn = gr.Button("ποΈ Get Schema", variant="primary") |
|
|
|
with gr.Column(scale=2): |
|
schema_output = gr.Textbox( |
|
label="Table Schema", |
|
lines=10, |
|
interactive=False |
|
) |
|
|
|
with gr.Tab("π§ Connection Test"): |
|
gr.Markdown("### Database Connection Test") |
|
with gr.Row(): |
|
test_btn = gr.Button("π§ Test Connection", variant="primary") |
|
connection_output = gr.Textbox( |
|
label="Connection Status", |
|
lines=5, |
|
interactive=False |
|
) |
|
|
|
|
|
execute_btn.click( |
|
fn=execute_query, |
|
inputs=[query_input], |
|
outputs=[query_output] |
|
) |
|
|
|
clear_btn.click( |
|
fn=lambda: "", |
|
outputs=[query_input] |
|
) |
|
|
|
sample_dropdown.change( |
|
fn=lambda x: x if x else "", |
|
inputs=[sample_dropdown], |
|
outputs=[query_input] |
|
) |
|
|
|
tables_btn.click( |
|
fn=list_tables, |
|
outputs=[tables_output] |
|
) |
|
|
|
schema_btn.click( |
|
fn=get_table_schema, |
|
inputs=[table_name_input], |
|
outputs=[schema_output] |
|
) |
|
|
|
test_btn.click( |
|
fn=test_connection, |
|
outputs=[connection_output] |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
interface.launch(mcp_server=True) |
|
|