import os import requests import gradio as gr from langchain.memory import ConversationBufferMemory # Updated import from langchain import OpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_community.utilities import SQLDatabase from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough from langchain_openai import ChatOpenAI from langchain.agents import create_tool_calling_agent, AgentExecutor, Tool from langchain_community.vectorstores import FAISS from langchain_openai import OpenAIEmbeddings from langchain.text_splitter import CharacterTextSplitter from PyPDF2 import PdfReader # Initialize the memory memory = ConversationBufferMemory(return_messages=True, memory_key="chat_history") open_api_key_token = os.environ['OPEN_AI_API'] os.environ['OPENAI_API_KEY'] = open_api_key_token db_uri = 'mysql+mysqlconnector://redmindgen:51(xtzb0z_P8wRkowkDGQe@188.166.133.137:3306/collegedb' #db_uri = 'postgresql+psycopg2://postgres:postpass@193.203.162.39:5432/warehouse' # Database setup db = SQLDatabase.from_uri(db_uri) # LLM setup llm = ChatOpenAI(model="gpt-3.5-turbo-0125") # Define the SQL query generation tool template_query_generation = """Based on the table schema below, write a SQL query that would answer the user's question: {schema} Question: {question} SQL Query:""" prompt_query_generation = ChatPromptTemplate.from_template(template_query_generation) def get_schema(_): return db.get_table_info() def generate_sql_query(question): schema = get_schema(None) input_data = {"question": question} sql_chain = (RunnablePassthrough.assign(schema=get_schema) | prompt_query_generation | llm.bind(stop="\n SQL Result:") | StrOutputParser() ) return sql_chain.invoke(input_data) def run_query(query): return db.run(query) # Define the database query tool def database_tool(question): sql_query = generate_sql_query(question) return run_query(sql_query) # Define the ASN API data retrieval tool def get_ASN_data(asn_id): base_url = "http://193.203.162.39:9090/nxt-wms/trnHeader?" if asn_id is None or asn_id.strip() == "": asn_id = "ASN24070100015" complete_url = f"{base_url}branchMaster.id=343&transactionUid={asn_id}&userId=164&transactionType=ASN" try: response = requests.get(complete_url) data = response.json() print (data) response.raise_for_status() # Raises an HTTPError if the response was an error if 'result' in data and 'content' in data['result'] and data['result']['content']: # Assuming the first content item and first party item are what we're interested in content = data['result']['content'][0] trnHeaderAsn = content['trnHeaderAsn'] party = content['party'][0] # Extracting the required information transactionUid = trnHeaderAsn['transactionUid'] customerOrderNo = trnHeaderAsn.get('customerOrderNo', 'N/A') # Using .get() for potentially missing keys orderDate = trnHeaderAsn.get('orderDate', 'N/A') customerInvoiceNo = trnHeaderAsn.get('customerInvoiceNo', 'N/A') invoiceDate = trnHeaderAsn.get('invoiceDate', 'N/A') expectedReceivingDate = trnHeaderAsn['expectedReceivingDate'] transactionStatus = trnHeaderAsn['transactionStatus'] shipper_code = party['shipper']['code'] if party['shipper'] else 'N/A' shipper_name = party['shipper']['name'] if party['shipper'] else 'N/A' # Assuming the variables are already defined as per previous context data = [ ["Transaction UID", transactionUid], ["Customer Order No", customerOrderNo], ["Order Date", orderDate], ["Customer Invoice No", customerInvoiceNo], ["Invoice Date", invoiceDate], ["Expected Receiving Date", expectedReceivingDate], ["Transaction Status", transactionStatus], ["Shipper Code", shipper_code], ["Shipper Name", shipper_name] ] return f"The ASN details of {asn_id} is {data}." else: return "ASN Details are not found. Please contact system administrator." except requests.exceptions.HTTPError as http_err: print(f"HTTP error occurred: {http_err}") except Exception as err: print(f"An error occurred: {err}") #get_weather_data("United Arab Emirates") # Define the document data tool def load_and_split_pdf(pdf_path): reader = PdfReader(pdf_path) text = '' for page in reader.pages: text += page.extract_text() text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200) texts = text_splitter.split_text(text) return texts def create_vector_store(texts): embeddings = OpenAIEmbeddings() vector_store = FAISS.from_texts(texts, embeddings) return vector_store def query_vector_store(vector_store, query): docs = vector_store.similarity_search(query) return '\n\n'.join([doc.page_content for doc in docs]) # Load and process the PDF (ensure the PDF is accessible from your Colab environment) pdf_path = "Inbound.pdf" texts = load_and_split_pdf(pdf_path) vector_store = create_vector_store(texts) print("vector store created") def document_data_tool(query): return query_vector_store(vector_store, query) # Initialize the agent with the tools tools = [ Tool(name="ASNData", func=get_ASN_data, description="Tool to get the status of ASN with ASN id given as input. Handles questions related to ASN id which starts with ASN followed by 11 numeric digits. For example, ASN24070100015 ", tool_choice="required"), Tool(name="DocumentData", func=document_data_tool, description="Tool to search and retrieve information from the uploaded document which is related to warehousing operations. Provide responses with the maximum of 150 words.", tool_choice="required"), Tool(name="DatabaseQuery", func=database_tool, description="Tool to query the database based on the user's question. Only handles questions related to the collegedb schema, including tables such as buildings, classrooms, college, course, faculty, interns, person, section, student, and textbook. Ensure to use only the available fields in these tables.Provide responses with the maximum of 150 words.", tool_choice="required"), ] prompt_template = f"""You are an assistant that helps with database queries, ASN API information, and document retrieval. For ASN-related questions, if the user specifies ASN id. Provide the information like ASN status, expected Receiving Date etc. For document-related questions, search and retrieve information from the uploaded document which is related to warehousing operations. For SQL database-related questions, only use the fields available in the collegedb schema, which includes tables such as buildings, classrooms, college, course, faculty, interns, person, section, student, and textbook. {{agent_scratchpad}} Question: {{input}} """ #{{memory.buffer}} prompt = ChatPromptTemplate.from_template(prompt_template) # Initialize the agent with memory llm_with_memory = llm.bind(memory=memory) agent = create_tool_calling_agent(llm_with_memory, tools, prompt) agent_executor = AgentExecutor(agent=agent, tools=tools, memory= memory, verbose=True) # Define the interface function max_iterations = 5 iterations = 0 def answer_question(user_question): # Format the response text response = agent_executor.invoke({"input": user_question}) if "tool_name" in response: print(response["tool_name"]) if isinstance(response, dict): response_text = response.get("output", "") else: response_text = response response_text = response_text.replace('\n', ' ').replace(' ', ' ').strip() return response_text ''' global iterations iterations = 0 while iterations < max_iterations: response = agent_executor.invoke({"input": user_question}) if isinstance(response, dict): response_text = response.get("output", "") else: response_text = response if "invalid" not in response_text.lower(): break iterations += 1 if iterations == max_iterations: return "The agent could not generate a valid response within the iteration limit." # Print memory buffer for debugging print("Memory Buffer:", memory.buffer) # Print memory buffer for debugging print("Memory Buffer11:", memory.load_memory_variables({})) ''' # Create the Gradio interface iface = gr.Interface( fn=answer_question, inputs="text", outputs="text", title="Chat with your data", description="Ask a question about the database or API or a document and get a response in natural language.", ) # Launch the Gradio interface iface.launch(share=True, debug=True)