import os, re, io, json, tempfile, shutil from pathlib import Path from datetime import datetime import duckdb import pandas as pd import gradio as gr from huggingface_hub import snapshot_download from ctransformers import AutoModelForCausalLM # ------------------------- # Config (tiny CPU model) # ------------------------- # A very small chat/instruct GGUF that downloads quickly and runs on CPU. # You can swap to any llama-family GGUF later (e.g., Qwen/Qwen2.5-1.5B-Instruct-GGUF). GGUF_REPO = os.getenv("GGUF_REPO", "TheBloke/TinyLlama-1.1B-Chat-v1.0-GGUF") GGUF_FILE = os.getenv("GGUF_FILE", "tinyllama-1.1b-chat-v1.0.Q5_K_M.gguf") # small & decent on CPU MODEL_TYPE = os.getenv("MODEL_TYPE", "llama") # llama/qwen/mpt etc. DB_PATH = "workspace.duckdb" DATA_DIR = Path("uploads"); DATA_DIR.mkdir(exist_ok=True) SYSTEM_PROMPT = """You are a senior analytics engineer generating SQLite/DuckDB compatible SQL ONLY. OUTPUT FORMAT (MANDATORY): SELECT ... Rules: - Return STRICTLY one SELECT statement (or a WITH ... SELECT) that runs on DuckDB. - Put ONLY the SQL between and . No other text. - Use ONLY tables/columns from the provided schema. - No DDL/DML (DROP/CREATE/INSERT/UPDATE/DELETE/ALTER/PRAGMA/ATTACH). - Add GROUP BY if needed for aggregates. - Prefer EXTRACT(YEAR FROM col)=YYYY (or strftime('%Y', col)='YYYY') for year filters. - Always end with LIMIT {limit}. """ # ------------- Model load (ctransformers) ------------- _llm = None def get_llm(): global _llm if _llm is not None: return _llm local_dir = snapshot_download(repo_id=GGUF_REPO, allow_patterns=GGUF_FILE) model_path = str(Path(local_dir) / GGUF_FILE) # tiny context window keeps memory low; tweak if needed _llm = AutoModelForCausalLM.from_pretrained( model_path, model_type=MODEL_TYPE, gpu_layers=0, # CPU-only context_length=2048 ) return _llm # ------------------ DuckDB helpers -------------------- def _conn(): return duckdb.connect(DB_PATH) def _sanitize_name(name:str)->str: base = re.sub(r'[^A-Za-z0-9_]', '_', name.strip()) if re.match(r'^\d', base): base = '_' + base return base def preload_samples(): con = _conn() data_dir = Path("data") if data_dir.exists(): for fn in data_dir.glob("*.csv"): tname = _sanitize_name(fn.stem) con.sql(f"CREATE OR REPLACE VIEW {tname} AS SELECT * FROM read_csv_auto('{str(fn)}', header=True, AUTO_DETECT=TRUE)") con.close() def load_files(files): con = _conn() # Drop prior views (keep DB) for row in con.sql("SELECT table_name FROM information_schema.tables WHERE table_schema='main'").fetchall(): t = row[0] try: con.sql(f"DROP VIEW IF EXISTS {t}") except: pass created = [] for f in (files or []): # Gradio File can be dict/NamedString/path if isinstance(f, (str, Path)): src = Path(f) elif isinstance(f, dict) and "name" in f: src = Path(f["name"]) elif hasattr(f, "name"): src = Path(f.name) else: return None, f"Unsupported upload object: {type(f)}" if not src.exists(): return None, f"Uploaded file not found on disk: {src}" dest = DATA_DIR / src.name shutil.copyfile(src, dest) tname = _sanitize_name(dest.stem) ext = dest.suffix.lower() try: if ext == ".csv": con.sql(f"CREATE OR REPLACE VIEW {tname} AS SELECT * FROM read_csv_auto('{str(dest)}', header=True, AUTO_DETECT=TRUE)") elif ext in {".parquet", ".pq"}: con.sql(f"CREATE OR REPLACE VIEW {tname} AS SELECT * FROM parquet_scan('{str(dest)}')") else: return None, f"Unsupported file type: {dest.name}" created.append(tname) except Exception as e: return None, f"DuckDB view creation failed for {dest.name}: {e}" con.close() return created, None def list_schema_lines(): con = _conn() rows = con.sql("SELECT table_name FROM information_schema.tables WHERE table_schema='main' ORDER BY table_name").fetchall() lines = [] for (t,) in rows: cols = con.sql(f"PRAGMA table_info('{t}')").fetchall() cols_str = ", ".join([f"{c[1]} ({c[2]})" for c in cols]) lines.append(f"{t}: {cols_str}") con.close() return lines def ensure_limit(sql:str, limit:int)->str: if re.search(r'(?is)\blimit\s+\d+\s*$', sql.strip()): return sql return sql.strip() + f" LIMIT {int(limit)}" def sanitize_sql(sql): if not sql: return None, "Empty SQL from model." s = sql.strip() s = re.sub(r"^```[a-zA-Z]*|```$", "", s).strip() s = s.split(";")[0].strip() if not re.match(r"(?is)^(with|select)\s", s): return None, "Generated SQL is not a SELECT/WITH query." banned = r"(?i)\b(drop|alter|insert|update|delete|attach|detach|vacuum|pragma|create|replace)\b" if re.search(banned, s): return None, "Query contains non-SELECT keywords." return s, None def run_sql(sql): try: con = _conn() df = con.sql(sql).df() con.close() return df, None except Exception as e: return None, f"SQL error: {e}" def fewshot_prompt(schema_lines, question, limit): schema_text = "\n".join(f"- {ln}" for ln in schema_lines) shots = f""" Q: total orders by status in 2025 WITH t AS ( SELECT * FROM orders WHERE EXTRACT(YEAR FROM order_date)=2025 ) SELECT status, COUNT(*) AS total FROM t GROUP BY status ORDER BY total DESC LIMIT {limit} Q: top 5 customers by quantity in 2025 WITH t AS ( SELECT * FROM orders WHERE EXTRACT(YEAR FROM order_date)=2025 ) SELECT customer_id, SUM(quantity) AS qty FROM t GROUP BY customer_id ORDER BY qty DESC LIMIT {limit} """ return ( SYSTEM_PROMPT.format(limit=limit) + "\n\nSCHEMA (DuckDB):\n" + schema_text + "\n\nFew-shot examples:\n" + shots + "\nQ: " + question.strip() + "\n" ) def extract_sql_span(generated: str, limit: int) -> str | None: """ Try the ... span; else regex the first SELECT/WITH through LIMIT n. """ g = generated.strip() # 1) Between tags if "" in g: after = g.split("", 1)[1] # Stop tokens may omit the closing tag; still cut at if present if "" in after: after = after.split("", 1)[0] g = after.strip() # 2) Regex: first SELECT/WITH ... LIMIT m = re.search(r"(?is)\b(with\s+.*?select|select)\b.*?\blimit\s+\d+\b", g, re.S) if m: return m.group(0).strip() # 3) Last resort: first line that looks like SQL lines = [ln for ln in g.splitlines() if not re.match(r"^\s*(Q:|SQL:)", ln)] g2 = "\n".join(lines).strip() m2 = re.search(r"(?is)\b(with\s+.*?select|select)\b.*", g2, re.S) if m2: # ensure a LIMIT at the end sql = m2.group(0).strip() if not re.search(r"(?is)\blimit\s+\d+\s*$", sql): sql += f" LIMIT {int(limit)}" return sql return None def llm_generate_sql(schema_lines, question, limit): try: llm = get_llm() prompt = fewshot_prompt(schema_lines, question, limit) text = llm( prompt, max_new_tokens=192, temperature=0.0, repetition_penalty=1.05, # Stop if the model tries to start another example or closes the tag: stop=["", "\nQ:", "\n\nQ:", "\nSQL:", "\n\nSQL:", "\nUser:"] ) text = str(text).strip() sql_candidate = extract_sql_span(text, limit) if not sql_candidate: return None, "could not extract SQL" return sql_candidate, None except Exception as e: return None, f"LLM error: {e}" def simple_fallback(schema_lines, question, limit): """Very small pattern fallback for common asks.""" q = question.lower() tables = [ln.split(":")[0] for ln in schema_lines] if schema_lines else [] # Try to guess a sensible default table: default_table = "orders" if "orders" in tables else (tables[0] if tables else "main") # Example: "top 5 customers by quantity in 2025" m = re.search(r"top\s+(\d+)", q) topn = int(m.group(1)) if m else 5 year_m = re.search(r"(?:in|for)\s+(20\d{2})", q) year = year_m.group(1) if year_m else None if "top" in q and "customer" in q and "quantity" in q: year_filter = f"WHERE EXTRACT(YEAR FROM order_date)={year}\n" if year and "orders" in tables else "" return f"""WITH t AS ( SELECT * FROM {default_table} {year_filter.strip()} ) SELECT customer_id, SUM(quantity) AS qty FROM t GROUP BY customer_id ORDER BY qty DESC LIMIT {min(topn, int(limit))} """, None if "distinct" in q and "product" in q: return f"SELECT DISTINCT product FROM {default_table} LIMIT {int(limit)}", None if "count" in q and "orders" in q: y = f" WHERE EXTRACT(YEAR FROM order_date)={year}" if year and "orders" in tables else "" return f"SELECT COUNT(*) AS total_orders FROM {default_table}{y} LIMIT 1", None # Final fallback: preview first table return f"SELECT * FROM {default_table} LIMIT {int(limit)}", "(rule-based fallback)" # ----------------- Gradio UI + handlers ----------------- def app_init(): if not Path(DB_PATH).exists(): preload_samples() schema = list_schema_lines() return "\n".join(schema) if schema else "(no tables loaded — upload CSV/Parquet to begin)" def on_load(files): created, err = load_files(files) if err: return f"Error: {err}", None, None, "" schema = list_schema_lines() return "\n".join(schema), None, None, None def on_ask(question, limit): schema = list_schema_lines() if not schema: return "No tables loaded.", None, None, None sql_text, err = llm_generate_sql(schema, question, limit) info = "" if err or not sql_text: sql_text, info = simple_fallback(schema, question, limit) sql_sanitized, serr = sanitize_sql(sql_text) if serr: return f"Bad SQL: {serr}\n---\n{sql_text}", None, None, None sql_limited = ensure_limit(sql_sanitized, limit) df, rerr = run_sql(sql_limited) if rerr: return f"SQL Error: {rerr}\n---\n{sql_limited}", None, None, None # Temp CSV for download csv_bytes = df.to_csv(index=False).encode("utf-8") tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv") tmp.write(csv_bytes); tmp.flush(); tmp.close() return ( sql_limited + (("\n\n" + info) if info else ""), df.head(min(int(limit), 2000)), tmp.name, None ) with gr.Blocks(title="NL → SQL (CPU, no keys)", css="footer.svelte-1ipelgc{display:none}") as demo: gr.Markdown("## Natural Language → SQL — DuckDB (CPU, no API keys)\nUpload CSV/Parquet, ask in English, get SQL & results. Uses a tiny GGUF instruct model locally with ctransformers.") with gr.Row(): with gr.Column(scale=1): gr.Markdown("**1) Load data** — Upload one or more files (*.csv, *.parquet).") files = gr.File(label="Upload CSV/Parquet", file_count="multiple", file_types=[".csv", ".parquet"]) load_btn = gr.Button("Load Database", variant="secondary") schema_box = gr.Textbox(label="Detected Schema (DuckDB)", value=app_init(), lines=10) schema_box.interactive = False with gr.Column(scale=1): gr.Markdown("**2) Ask a question** — The agent converts English → SQL.") q = gr.Textbox(label="Your question", placeholder="e.g., top 5 customers by quantity in 2025", lines=3) limit = gr.Slider(label="Row limit", minimum=10, maximum=10000, value=500, step=10) ask_btn = gr.Button("Generate SQL & Run", variant="primary") sql_out = gr.Code(label="Generated SQL") df_out = gr.Dataframe(label="Results (preview)", interactive=False, visible=True) csv_dl = gr.File(label="Download results as CSV", visible=False) note = gr.Markdown("Tip: You can change the model via environment vars `GGUF_REPO` and `GGUF_FILE` (Space → Settings).", visible=True) load_btn.click(fn=on_load, inputs=[files], outputs=[schema_box, df_out, csv_dl, note]) ask_btn.click(fn=on_ask, inputs=[q, limit], outputs=[sql_out, df_out, csv_dl, note]) if __name__ == "__main__": demo.queue(max_size=32).launch()