QueryHelper / queryHelperManagerCoT.py
anumaurya114exp's picture
Update queryHelperManagerCoT.py
31528fe
raw
history blame
7.72 kB
from gptManager import ChatgptManager
from utils import *
import json
import sqlparse
from constants import TABLE_RELATIONS
class QueryHelperChainOfThought:
def __init__(self, gptInstanceForCoT: ChatgptManager,
dbEngine, schemaName,
platform, metadataLayout: MetaDataLayout, sampleDataRows,
gptSampleRows, getSampleDataForTablesAndCols, tableSummaryJson='tableSummaryDict.json'):
self.gptInstanceForCoT = gptInstanceForCoT
self.schemaName = schemaName
self.platform = platform
self.metadataLayout = metadataLayout
self.sampleDataRows = sampleDataRows
self.gptSampleRows = gptSampleRows
self.getSampleDataForTablesAndCols = getSampleDataForTablesAndCols
self.dbEngine = dbEngine
self.tableSummaryJson = tableSummaryJson
self._onMetadataChange()
def _onMetadataChange(self):
metadataLayout = self.metadataLayout
sampleDataRows = self.sampleDataRows
dbEngine = self.dbEngine
schemaName = self.schemaName
selectedTablesAndCols = metadataLayout.getSelectedTablesAndCols()
self.sampleData = self.getSampleDataForTablesAndCols(dbEngine=dbEngine,schemaName=schemaName,
tablesAndCols=selectedTablesAndCols, maxRows=sampleDataRows)
def getMetadata(self) -> MetaDataLayout :
return self.metadataLayout
def updateMetadata(self, metadataLayout):
self.metadataLayout = metadataLayout
self._onMetadataChange()
def getQueryForUserInputCoT(self, userInput):
prompt = self.getPromptForCot()
self.gptInstanceForCoT.setSystemPrompt(prompt)
gptResponse = self.gptInstanceForCoT.getResponseForUserInput(userInput)
tryParsing = True
parsedSql = False
if tryParsing:
try:
txt = gptResponse.split("```json")[-1].split('```')[0].replace('\n', '')
sqlResult = json.loads(txt)['finalResult']
parsedSql = True
tryParsing = False
except:
print("Couldn't parse desired result from gpt response using method 1.")
if tryParsing:
try:
sqlResult = json.loads(gptResponse)['finalResult']
parsedSql = True
tryParsing = False
except:
print("Couldn't parse desired result from gpt response using method 2")
if parsedSql:
formattedSql = sqlparse.format(sqlResult, reindent=True)
responseToReturn = formattedSql
else:
responseToReturn = gptResponse
return responseToReturn
def getPromptForCot(self):
schemaName = self.schemaName
platform = self.platform
tableSummaryDict = json.load(open(self.tableSummaryJson, 'r'))
selectedTablesAndCols = self.metadataLayout.getSelectedTablesAndCols()
egUserInput = "I want to get top 5 product categories by state, then rank categories on decreasing order of total sales"
cotSubtaskOutput = """{
"subquery1": {
"inputSubquery": [],
"descriptioin":"calculate the total sales and assigns ranks to product categories within each state based on the descending order of sales in the tbl_f_sales table, utilizing joins with tbl_d_product and tbl_d_customer tables.",
"result": "SELECT c.state, b.category, SUM(a.transaction_amount) as total_sales,
RANK() OVER(PARTITION BY c.state ORDER BY SUM(a.transaction_amount) DESC) as category_rank
FROM lpdatamart.tbl_f_sales a
JOIN lpdatamart.tbl_d_product b
ON a.product_id = b.product_id
JOIN lpdatamart.tbl_d_customer c
ON a.customer_id = c.customer_id
GROUP BY c.state, b.category "
},
"subquery2": {
"inputSubquery": ["subquery1"],
"description":"extracts state, category, and total sales information from a subquery named "subquery1," filtering the results to include only categories with ranks up to 5 and sorting them by state and category rank."
"result":"SELECT state, category, total_sales
FROM ranked_categories
WHERE category_rank <= 5
ORDER BY state, category_rank"
},
"finalResult":"WITH subquery1 AS (
SELECT c.state, b.category, SUM(a.transaction_amount) as total_sales,
RANK() OVER(PARTITION BY c.state ORDER BY SUM(a.transaction_amount) DESC) as category_rank
FROM lpdatamart.tbl_f_sales a
JOIN lpdatamart.tbl_d_product b
ON a.product_id = b.product_id
JOIN lpdatamart.tbl_d_customer c
ON a.customer_id = c.customer_id
GROUP BY c.state, b.category
)
SELECT state, category, total_sales
FROM subquery1
WHERE category_rank <= 5
ORDER BY state, category_rank"
}"""
promptTableInfo = self.getSystemPromptForTableCols()
selectedTablesAndCols = self.metadataLayout.getSelectedTablesAndCols()
promptColumnsInfo = self.getSystemPromptForQuery(selectedTablesAndCols)
prompt = f"""You are a powerful text to sql model. Your task is to return sql query which answers
user's input. Please follow subquery structure if the sql needs to have multiple subqueries.
###example userInput is {egUserInput}. output is {cotSubtaskOutput}. Output should be in json format as provided. Only output should be in response, nothing else.\n\n
tables information are {promptTableInfo}.
columns data are {promptColumnsInfo}.
"""
prompt += f"and table Relations are {TABLE_RELATIONS}"
return prompt
def getSystemPromptForTableCols(self):
schemaName = self.schemaName
platform = self.platform
tableSummaryDict = json.load(open(self.tableSummaryJson, 'r'))
selectedTablesAndCols = self.metadataLayout.getSelectedTablesAndCols()
promptTableInfo = f"""You are a powerful text to sql model. Answer which tables and columns are needed
to answer user input using sql query. and following are tables and columns info. and example user input and result query."""
for idx, tableName in enumerate(selectedTablesAndCols.keys(), start=1):
promptTableInfo += f"table name {tableName} and summary is {tableSummaryDict[tableName]}"
promptTableInfo += f" and columns {', '.join(selectedTablesAndCols[tableName])} \n"
promptTableInfo += "XXXX"
#Join statements
promptTableInfo += f"and table Relations are {TABLE_RELATIONS}"
return promptTableInfo
def getSystemPromptForQuery(self, prospectTablesAndCols):
schemaName = self.schemaName
platform = self.platform
tableSummaryDict = json.load(open(self.tableSummaryJson,'r'))
exampleQuery = """SELECT a.customer_id, COUNT(a.product_id) as chandelier_count
FROM lpdatamart.tbl_f_sales a
JOIN lpdatamart.tbl_d_product b ON a.product_id = b.product_id
JOIN lpdatamart.tbl_d_calendar c ON a.date_id = c.date_id
WHERE UPPER(b.product_name) LIKE '%CHANDELIER%' AND c.calendar_month = 'NOVEMBER' AND c.year = 2023
GROUP BY a.customer_id
ORDER BY chandelier_count DESC"""
question = "top 5 customers who bought most chandeliers in nov 2023"
promptForQuery = f"""You are a powerful text to sql model. Answer user input with sql query. And the query needs to run on {platform}. and schemaName is {schemaName}. There is example user input and desired generated sql query. Follow similar patterns as example. eg case insensitive, explicit variable declaration etc. user input : {question}, query : {exampleQuery}. and table's data is \n"""
for idx, tableName in enumerate(prospectTablesAndCols.keys(), start=1):
promptForQuery += f"table name is {tableName}, table data is {self.sampleData[tableName][prospectTablesAndCols[tableName]].head(self.gptSampleRows)}"
promptForQuery += f"and table Relations are {TABLE_RELATIONS}"
return promptForQuery.replace("\\"," ").replace(" "," ").replace("XXXX", " ")