import pandas as pd import sqlite3 from openai import OpenAI import datetime def load_csv_to_sqlite(file_path): """ Loads a CSV file into an SQLite in-memory database. :param file_path: Path to the CSV file. :return: SQLite connection object. """ # Read the CSV file into a DataFrame df = pd.read_csv(file_path) # Extract column headers headers = df.columns.tolist() # Create a SQLite in-memory database and connect to it conn = sqlite3.connect(':memory:') # Write the data to a sqlite table df.to_sql('data', conn, index=False, if_exists='replace') return conn, headers def execute_sql_query(conn, query, max_rows=10, max_columns=10, truncate=True): """ Executes a SQL query on the SQLite database and returns a map of column to array of values. :param conn: SQLite connection object. :param query: SQL query string. :param max_rows: Maximum number of rows to display. Default is 10. :param max_columns: Maximum number of columns to display. Default is 10. :param truncate: Flag to indicate whether to truncate the output. Default is True. :return: Query results as a map of column to array of values. """ df = pd.read_sql_query(query, conn) truncated = False if truncate and (len(df) > max_rows or len(df.columns) > max_columns): truncated = True df = df.iloc[:max_rows, :max_columns] result = f'{df.to_dict(orient="list")}\n\n{"OUTPUT_TRUNCATED" if truncated else ""}' return result client = OpenAI() def generate_sql_query_with_gpt(headers, user_prompt): """ Generates a SQL query using OpenAI's GPT based on CSV headers and user input. :param headers: List of column headers from the CSV. :param user_prompt: User's input describing the desired SQL query. :return: Generated SQL query. """ # Format the prompt with headers and user input prompt = f"The database table has the following columns: {', '.join(headers)}.\n" prompt += f"Based on these columns, write a SQL query to {user_prompt}" # Call OpenAI API try: messages = [ { "role": "system", "content": "You are part of a system that will help you write a SQL query based on a prompt. You should ONLY return the SQL query, or 'NA' if you cannot write a SQL query based on the prompt. The name of the table is 'data'." }, { "role": "user", "content": f"User Query:\n{prompt}" } ] completion = client.chat.completions.create( model="gpt-3.5-turbo", messages=messages ) return completion.choices[0].message.content except Exception as e: print(f"Error while querying OpenAI: {e}") return None def extract_sql_queries_from_gpt_response(response): """ Extracts SQL queries from the GPT response and stores them in a map with labels and descriptions. :param response: GPT response string. :return: Map of labels and SQL queries. """ # Split the response by new line lines = response.split('\n') # Extract SQL queries queries = {} for line in lines: if line.startswith('SQL QUERY |'): label, description = line.replace('SQL QUERY |', '').split(':') queries[label.strip()] = description.strip() return queries def extract_answer_from_gpt_response(response): """ Extracts the answer from the GPT response. :param response: GPT response string. :return: Answer string. """ # Split the response by new line lines = response.split('\n') # Extract the answer for line in lines: if line.startswith('DONE:'): return line.replace('DONE:', '').strip() return None def plannerAgent(conn, headers, contextVariables, user_prompt, prior_queries={}, depth=0): """ Plans the SQL prompts to pass into generate_sql_query_with_gpt. :param conn: Database connection object. :param headers: List of column headers from the CSV. :param contextVariables: Context variables that might be used in the query. :param user_prompt: User's input describing the desired SQL query. :param depth: Current recursion depth. :return: Planned SQL prompt. """ print("Depth: ", depth) if depth > 5: print(prior_queries) messages = [ { "role": "system", "content": f"""You are the manager of a system that is aimed to query structured data in a database. Return "DONE:" with the answer after. Format the answer in natural language, such that just that answer is the answer to the user prompt, and give some justication if necessary. Also, if you cannot answer the user query, ie it is impossible to answer, maybe because the column/data does not exist, or if the prompt is irrelevant, return "DONE:" and describe why. """ } ] else: messages = [ { "role": "system", "content": f"""You are the manager of a system that is aimed to query structured data in a database. You are given a user query and a database table row example. Based on these columns come up with a plan to query the database table to answer the user query. These queries will be passed directly to the database and executed, so gather information that will help you answer the user query. Ensure though that the queries are isolated and do not affect each other. Preface every individual query with 'SQL QUERY | `label`: `sql_query`' and separate multiple queries with a new line. For example, SQL QUERY | UNIQUE_FIRST_NAME_COUNT: SELECT COUNT(DISTINCT "First Name") FROM data; Remember to always have quotations around column names, and to ALWAYS SELECT from `data` (no other tables exist). You can use any SQL function or clause that is supported by SQLite. This can be recursive, so you may have a store called 'PREVIOUS_QUERY_RESULTS' that contains the results of the previous queries that you've generated. No intermediate values are allowed to be stored here, this parameter just has a string representations of the previous query for reference, so no further querying of these results is allowed. If you have enough information to answer the user query directly, without additional information or variables, based on the SQL results, then return "DONE:" with the answer after. Format the answer in natural language, such that just that answer is the answer to the user prompt, and give some justication if necessary. Also, if you cannot answer the user query, ie it is impossible to answer because the column/data does not exist, or if the prompt is irrelevant, return "DONE:" and describe why. """ } ] try: messages.append( { "role": "user", "content": f"USER_QUERY:\n{user_prompt}\n\nEXAMPLE_ROW:\n{execute_sql_query(conn, 'SELECT * FROM data LIMIT 5;', truncate=False)}\n\nCONTEXT_VARIABLES:\n{contextVariables}" } ) if len(prior_queries) > 0: messages.append({ "role": "system", "content": f"PREVIOUS_QUERY_RESULTS:\n{prior_queries}" }) completion = client.chat.completions.create( model="gpt-4", messages=messages ) plan = completion.choices[0].message.content print(messages) print(plan) answer = extract_answer_from_gpt_response(plan) if answer: return answer else: extracted_queries = extract_sql_queries_from_gpt_response(plan) for label, query in extracted_queries.items(): prior_queries[label] = execute_sql_query(conn, query) print(query) return plannerAgent(conn, headers, contextVariables, user_prompt, prior_queries, depth+1) except Exception as e: print(f"Error while querying OpenAI: {e}") return None import gradio as gr def process_csv_and_query(csv_file, user_prompt): # Load the CSV file into SQLite conn, headers = load_csv_to_sqlite(csv_file.name) contextVariables = { 'currentTime': datetime.datetime.now() } prior_queries = {} # Generate the Planner answer = plannerAgent(conn, headers, contextVariables, user_prompt, prior_queries, 0) # Close the SQLite connection conn.close() return answer # Define the Gradio interface iface = gr.Interface( fn=process_csv_and_query, inputs=[ gr.File(label="Upload CSV"), gr.Textbox(label="Enter your query") ], outputs=gr.Textbox(label="Query Result"), title="Cellica Table Querying Test" ) # Launch the Gradio interface iface.launch(share=True)