import os, re, io, json, tempfile, shutil
from pathlib import Path
from datetime import datetime
import duckdb
import pandas as pd
import gradio as gr
from huggingface_hub import snapshot_download
from ctransformers import AutoModelForCausalLM
# -------------------------
# Config (tiny CPU model)
# -------------------------
# A very small chat/instruct GGUF that downloads quickly and runs on CPU.
# You can swap to any llama-family GGUF later (e.g., Qwen/Qwen2.5-1.5B-Instruct-GGUF).
GGUF_REPO = os.getenv("GGUF_REPO", "TheBloke/TinyLlama-1.1B-Chat-v1.0-GGUF")
GGUF_FILE = os.getenv("GGUF_FILE", "tinyllama-1.1b-chat-v1.0.Q5_K_M.gguf") # small & decent on CPU
MODEL_TYPE = os.getenv("MODEL_TYPE", "llama") # llama/qwen/mpt etc.
DB_PATH = "workspace.duckdb"
DATA_DIR = Path("uploads"); DATA_DIR.mkdir(exist_ok=True)
SYSTEM_PROMPT = """You are a senior analytics engineer generating SQLite/DuckDB compatible SQL ONLY.
OUTPUT FORMAT (MANDATORY):
SELECT ...
Rules:
- Return STRICTLY one SELECT statement (or a WITH ... SELECT) that runs on DuckDB.
- Put ONLY the SQL between and . No other text.
- Use ONLY tables/columns from the provided schema.
- No DDL/DML (DROP/CREATE/INSERT/UPDATE/DELETE/ALTER/PRAGMA/ATTACH).
- Add GROUP BY if needed for aggregates.
- Prefer EXTRACT(YEAR FROM col)=YYYY (or strftime('%Y', col)='YYYY') for year filters.
- Always end with LIMIT {limit}.
"""
# ------------- Model load (ctransformers) -------------
_llm = None
def get_llm():
global _llm
if _llm is not None:
return _llm
local_dir = snapshot_download(repo_id=GGUF_REPO, allow_patterns=GGUF_FILE)
model_path = str(Path(local_dir) / GGUF_FILE)
# tiny context window keeps memory low; tweak if needed
_llm = AutoModelForCausalLM.from_pretrained(
model_path,
model_type=MODEL_TYPE,
gpu_layers=0, # CPU-only
context_length=2048
)
return _llm
# ------------------ DuckDB helpers --------------------
def _conn():
return duckdb.connect(DB_PATH)
def _sanitize_name(name:str)->str:
base = re.sub(r'[^A-Za-z0-9_]', '_', name.strip())
if re.match(r'^\d', base): base = '_' + base
return base
def preload_samples():
con = _conn()
data_dir = Path("data")
if data_dir.exists():
for fn in data_dir.glob("*.csv"):
tname = _sanitize_name(fn.stem)
con.sql(f"CREATE OR REPLACE VIEW {tname} AS SELECT * FROM read_csv_auto('{str(fn)}', header=True, AUTO_DETECT=TRUE)")
con.close()
def load_files(files):
con = _conn()
# Drop prior views (keep DB)
for row in con.sql("SELECT table_name FROM information_schema.tables WHERE table_schema='main'").fetchall():
t = row[0]
try: con.sql(f"DROP VIEW IF EXISTS {t}")
except: pass
created = []
for f in (files or []):
# Gradio File can be dict/NamedString/path
if isinstance(f, (str, Path)):
src = Path(f)
elif isinstance(f, dict) and "name" in f:
src = Path(f["name"])
elif hasattr(f, "name"):
src = Path(f.name)
else:
return None, f"Unsupported upload object: {type(f)}"
if not src.exists():
return None, f"Uploaded file not found on disk: {src}"
dest = DATA_DIR / src.name
shutil.copyfile(src, dest)
tname = _sanitize_name(dest.stem)
ext = dest.suffix.lower()
try:
if ext == ".csv":
con.sql(f"CREATE OR REPLACE VIEW {tname} AS SELECT * FROM read_csv_auto('{str(dest)}', header=True, AUTO_DETECT=TRUE)")
elif ext in {".parquet", ".pq"}:
con.sql(f"CREATE OR REPLACE VIEW {tname} AS SELECT * FROM parquet_scan('{str(dest)}')")
else:
return None, f"Unsupported file type: {dest.name}"
created.append(tname)
except Exception as e:
return None, f"DuckDB view creation failed for {dest.name}: {e}"
con.close()
return created, None
def list_schema_lines():
con = _conn()
rows = con.sql("SELECT table_name FROM information_schema.tables WHERE table_schema='main' ORDER BY table_name").fetchall()
lines = []
for (t,) in rows:
cols = con.sql(f"PRAGMA table_info('{t}')").fetchall()
cols_str = ", ".join([f"{c[1]} ({c[2]})" for c in cols])
lines.append(f"{t}: {cols_str}")
con.close()
return lines
def ensure_limit(sql:str, limit:int)->str:
if re.search(r'(?is)\blimit\s+\d+\s*$', sql.strip()):
return sql
return sql.strip() + f" LIMIT {int(limit)}"
def sanitize_sql(sql):
if not sql: return None, "Empty SQL from model."
s = sql.strip()
s = re.sub(r"^```[a-zA-Z]*|```$", "", s).strip()
s = s.split(";")[0].strip()
if not re.match(r"(?is)^(with|select)\s", s):
return None, "Generated SQL is not a SELECT/WITH query."
banned = r"(?i)\b(drop|alter|insert|update|delete|attach|detach|vacuum|pragma|create|replace)\b"
if re.search(banned, s):
return None, "Query contains non-SELECT keywords."
return s, None
def run_sql(sql):
try:
con = _conn()
df = con.sql(sql).df()
con.close()
return df, None
except Exception as e:
return None, f"SQL error: {e}"
def fewshot_prompt(schema_lines, question, limit):
schema_text = "\n".join(f"- {ln}" for ln in schema_lines)
shots = f"""
Q: total orders by status in 2025
WITH t AS (
SELECT * FROM orders WHERE EXTRACT(YEAR FROM order_date)=2025
)
SELECT status, COUNT(*) AS total
FROM t
GROUP BY status
ORDER BY total DESC
LIMIT {limit}
Q: top 5 customers by quantity in 2025
WITH t AS (
SELECT * FROM orders WHERE EXTRACT(YEAR FROM order_date)=2025
)
SELECT customer_id, SUM(quantity) AS qty
FROM t
GROUP BY customer_id
ORDER BY qty DESC
LIMIT {limit}
"""
return (
SYSTEM_PROMPT.format(limit=limit)
+ "\n\nSCHEMA (DuckDB):\n"
+ schema_text
+ "\n\nFew-shot examples:\n"
+ shots
+ "\nQ: "
+ question.strip()
+ "\n"
)
def extract_sql_span(generated: str, limit: int) -> str | None:
"""
Try the ... span; else regex the first SELECT/WITH through LIMIT n.
"""
g = generated.strip()
# 1) Between tags
if "" in g:
after = g.split("", 1)[1]
# Stop tokens may omit the closing tag; still cut at if present
if "" in after:
after = after.split("", 1)[0]
g = after.strip()
# 2) Regex: first SELECT/WITH ... LIMIT
m = re.search(r"(?is)\b(with\s+.*?select|select)\b.*?\blimit\s+\d+\b", g, re.S)
if m:
return m.group(0).strip()
# 3) Last resort: first line that looks like SQL
lines = [ln for ln in g.splitlines() if not re.match(r"^\s*(Q:|SQL:)", ln)]
g2 = "\n".join(lines).strip()
m2 = re.search(r"(?is)\b(with\s+.*?select|select)\b.*", g2, re.S)
if m2:
# ensure a LIMIT at the end
sql = m2.group(0).strip()
if not re.search(r"(?is)\blimit\s+\d+\s*$", sql):
sql += f" LIMIT {int(limit)}"
return sql
return None
def llm_generate_sql(schema_lines, question, limit):
try:
llm = get_llm()
prompt = fewshot_prompt(schema_lines, question, limit)
text = llm(
prompt,
max_new_tokens=192,
temperature=0.0,
repetition_penalty=1.05,
# Stop if the model tries to start another example or closes the tag:
stop=["", "\nQ:", "\n\nQ:", "\nSQL:", "\n\nSQL:", "\nUser:"]
)
text = str(text).strip()
sql_candidate = extract_sql_span(text, limit)
if not sql_candidate:
return None, "could not extract SQL"
return sql_candidate, None
except Exception as e:
return None, f"LLM error: {e}"
def simple_fallback(schema_lines, question, limit):
"""Very small pattern fallback for common asks."""
q = question.lower()
tables = [ln.split(":")[0] for ln in schema_lines] if schema_lines else []
# Try to guess a sensible default table:
default_table = "orders" if "orders" in tables else (tables[0] if tables else "main")
# Example: "top 5 customers by quantity in 2025"
m = re.search(r"top\s+(\d+)", q)
topn = int(m.group(1)) if m else 5
year_m = re.search(r"(?:in|for)\s+(20\d{2})", q)
year = year_m.group(1) if year_m else None
if "top" in q and "customer" in q and "quantity" in q:
year_filter = f"WHERE EXTRACT(YEAR FROM order_date)={year}\n" if year and "orders" in tables else ""
return f"""WITH t AS (
SELECT * FROM {default_table}
{year_filter.strip()}
)
SELECT customer_id, SUM(quantity) AS qty
FROM t
GROUP BY customer_id
ORDER BY qty DESC
LIMIT {min(topn, int(limit))}
""", None
if "distinct" in q and "product" in q:
return f"SELECT DISTINCT product FROM {default_table} LIMIT {int(limit)}", None
if "count" in q and "orders" in q:
y = f" WHERE EXTRACT(YEAR FROM order_date)={year}" if year and "orders" in tables else ""
return f"SELECT COUNT(*) AS total_orders FROM {default_table}{y} LIMIT 1", None
# Final fallback: preview first table
return f"SELECT * FROM {default_table} LIMIT {int(limit)}", "(rule-based fallback)"
# ----------------- Gradio UI + handlers -----------------
def app_init():
if not Path(DB_PATH).exists():
preload_samples()
schema = list_schema_lines()
return "\n".join(schema) if schema else "(no tables loaded — upload CSV/Parquet to begin)"
def on_load(files):
created, err = load_files(files)
if err:
return f"Error: {err}", None, None, ""
schema = list_schema_lines()
return "\n".join(schema), None, None, None
def on_ask(question, limit):
schema = list_schema_lines()
if not schema:
return "No tables loaded.", None, None, None
sql_text, err = llm_generate_sql(schema, question, limit)
info = ""
if err or not sql_text:
sql_text, info = simple_fallback(schema, question, limit)
sql_sanitized, serr = sanitize_sql(sql_text)
if serr:
return f"Bad SQL: {serr}\n---\n{sql_text}", None, None, None
sql_limited = ensure_limit(sql_sanitized, limit)
df, rerr = run_sql(sql_limited)
if rerr:
return f"SQL Error: {rerr}\n---\n{sql_limited}", None, None, None
# Temp CSV for download
csv_bytes = df.to_csv(index=False).encode("utf-8")
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv")
tmp.write(csv_bytes); tmp.flush(); tmp.close()
return (
sql_limited + (("\n\n" + info) if info else ""),
df.head(min(int(limit), 2000)),
tmp.name,
None
)
with gr.Blocks(title="NL → SQL (CPU, no keys)", css="footer.svelte-1ipelgc{display:none}") as demo:
gr.Markdown("## Natural Language → SQL — DuckDB (CPU, no API keys)\nUpload CSV/Parquet, ask in English, get SQL & results. Uses a tiny GGUF instruct model locally with ctransformers.")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("**1) Load data** — Upload one or more files (*.csv, *.parquet).")
files = gr.File(label="Upload CSV/Parquet", file_count="multiple", file_types=[".csv", ".parquet"])
load_btn = gr.Button("Load Database", variant="secondary")
schema_box = gr.Textbox(label="Detected Schema (DuckDB)", value=app_init(), lines=10)
schema_box.interactive = False
with gr.Column(scale=1):
gr.Markdown("**2) Ask a question** — The agent converts English → SQL.")
q = gr.Textbox(label="Your question", placeholder="e.g., top 5 customers by quantity in 2025", lines=3)
limit = gr.Slider(label="Row limit", minimum=10, maximum=10000, value=500, step=10)
ask_btn = gr.Button("Generate SQL & Run", variant="primary")
sql_out = gr.Code(label="Generated SQL")
df_out = gr.Dataframe(label="Results (preview)", interactive=False, visible=True)
csv_dl = gr.File(label="Download results as CSV", visible=False)
note = gr.Markdown("Tip: You can change the model via environment vars `GGUF_REPO` and `GGUF_FILE` (Space → Settings).", visible=True)
load_btn.click(fn=on_load, inputs=[files], outputs=[schema_box, df_out, csv_dl, note])
ask_btn.click(fn=on_ask, inputs=[q, limit], outputs=[sql_out, df_out, csv_dl, note])
if __name__ == "__main__":
demo.queue(max_size=32).launch()