chatbot_combined / custom_llm.py
jonathanjordan21's picture
Update custom_llm.py
0c857e8 verified
from typing import Any, List, Mapping, Optional
from langchain_core.callbacks.manager import CallbackManagerForLLMRun
from langchain_core.language_models.llms import LLM
from typing import Literal
import requests
from langchain.prompts import PromptTemplate, ChatPromptTemplate
from operator import itemgetter
from langchain.memory import ChatMessageHistory, ConversationBufferMemory
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_community.chat_models import ChatOpenAI
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_core.messages import AIMessage, HumanMessage
from langchain_community.document_loaders import DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyMuPDFLoader
import os, requests
from langchain.embeddings import HuggingFaceEmbeddings
from langchain_community.embeddings import HuggingFaceInferenceAPIEmbeddings
from langchain.vectorstores import FAISS
from langchain_core.runnables import RunnableBranch
import pickle, asyncio, traceback
# os.environ['FAISS_NO_AVX2'] = '1'
import pandas as pd
async def create_vectorstore():
API_TOKEN = os.getenv('HF_INFER_API')
loader = os.getenv('knowledge_base')
# web_loader = load_web("https://lintasmediadanawa.com")
splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=20)
# docs = splitter.create_documents([loader]+web_loader)
docs = splitter.create_documents([loader])
print(len(docs))
emb_model = HuggingFaceEmbeddings(model_name='sentence-transformers/paraphrase-multilingual-mpnet-base-v2', encode_kwargs={'normalize_embeddings': True})
# emb_model = HuggingFaceInferenceAPIEmbeddings(
# api_key=API_TOKEN, model_name="sentence-transformers/paraphrase-multilingual-mpnet-base-v2", encode_kwargs={'normalize_embeddings': True}
# )
async def add_docs(d):
db.aadd_documents(await splitter.atransform_documents([d]))
db = await FAISS.afrom_documents(docs, emb_model)
f = pickle.load(open("wi_knowledge.dat", "rb"))
print("Docs len :", len(f))
tasks = []
for d in f:
tasks.append(db.aadd_documents(await splitter.atransform_documents([d])))
await asyncio.gather(*tasks)
# asyncio.run(db.aadd_documents(asyncio.run(splitter.atransform_documents(f))))
# emb_model = HuggingFaceInferenceAPIEmbeddings(
# api_key=API_TOKEN, model_name="sentence-transformers/paraphrase-multilingual-mpnet-base-v2", encode_kwargs={'normalize_embeddings': True}
# )
# x = open("wi_knowledge.pkl", 'rb')
# db = FAISS.deserialize_from_bytes(
# embeddings=emb_model, serialized=x
# )
# db = pickle.load(x)
# print(db)
# db.add_documents( splitter.transform_documents(docs) )
return db
def custom_chain_with_history(llm, memory):
# prompt = PromptTemplate.from_template("""<s><INST><|system|>
# You are a helpful and informative AI customer service assistant. Always remember to thank the customer when they say thank you and greet them when they greet you.
# You have access to the following context of knowledge base and internal resources to find the most relevant information for the customer's needs:
# {context}
# Respond to the user with the following chat history between you and the user:
# {chat_history}
# <|user|>
# {question}
# <|assistant|>
# """)
prompt = PromptTemplate.from_template("""<s><INST><|system|>
Anda adalah asisten AI Chatbot customer service.
Anda memiliki akses table dibawah ini untuk menemukan informasi yang paling relevan dengan kebutuhan user:
{context}
Berikan respon kepada user berdasarkan riwayat chat berikut dengan bahasa yang digunakan terakhir kali oleh user, jika tidak ada informasi yang relevan maka itu adalah informasi yang rahasia dan Anda tidak diizinkan untuk menyebarkan informasi tersebut kepada user:
{chat_history}
<|user|>
{question}
<|assistant|>
""")
def prompt_memory(memory):
t = ""
for x in memory.chat_memory.messages:
t += f"<|assistant|>\n<s>{x.content}</s>\n" if type(x) is AIMessage else f"<|user|>\n{x.content}\n"
return "" if len(t) == 0 else t
def format_docs(docs):
# print(len(docs))
return "\n".join([f"{i+1}. {d.page_content}" for i,d in enumerate(docs)])
# prompt = ChatPromptTemplate.from_messages(
# [
# ("system", "You are a helpful chatbot"),
# MessagesPlaceholder(variable_name="history"),
# ("human", "{input}"),
# ]
# )
# return {"chat_history":prompt_memory, "context":asyncio.run(create_vectorstore()).as_retriever(search_type="similarity", search_kwargs={"k": 12}) | format_docs, "question": RunnablePassthrough()} | prompt | llm
return {"chat_history":lambda x:prompt_memory(x['memory']), "context":itemgetter("question") | asyncio.run(create_vectorstore()).as_retriever(search_type="similarity", search_kwargs={"k": 12}) | format_docs, "question": lambda x:x['question']} | prompt | llm
def format_df(df):
out = ""
for x in df.columns:
out+= x + "|"
out = out[:-1] + "\n\n"
for _,row in df.iterrows():
for x in row.values:
out += str(x) + "|"
out = out[:-1]
out += "\n"
return out
def out_format(text, llm, df):
prompt = PromptTemplate.from_template("""<s><INST>Fix the following code. Do not give explanation, just create the python code:
{code}
Error Message : {err}
Always change the corresponding columns into datetime format with parameter day_first=True, example:
df['column_name'] = pd.to_datetime(df['column_name'], day_first=True)
Always use idxmin or idxmax instead of array indicies whenever it is possible
Always use .iloc to query a dataframe instead of using array indicies directly
The output must follow the following example format:
```python
# Generated Code
```
</INST></s>""")
err_chain = prompt | llm
e_ = None
for i in range(6):
try :
print(text)
text_split = text.split("`python")[-1].split("```")[0].replace('\_', "_")
# text_split = text.split("# Generated Code")[-1].split("```")[0].replace("\_", "_")
if "response" not in text_split:
text = text.split("```")[0].replace('\_', "_")
else :
text = text_split
print(text)
try :
exec(text)
except :
text_split = text.split("# Generated Code")[-1].split("```")[0].replace("\_", "_")
if "response" not in text_split:
text = text.split("```")[0].replace('\_', "_")
else :
text = "# Generated Code" + text_split
print(text)
exec(text)
return text
except Exception as e:
print(f"ERORRR! ATTEMPT : {i}\n",str(traceback.format_exc(limit=2)))
text = err_chain.invoke({"code":text, "err":str(traceback.format_exc(limit=2))})
e_ = traceback.format_exc(limit=2)
# exec(text)
return "Bad Python Code, Error Message : " + str(e_)
def unique_value_str_func(unique_val):
return "\n".join([str(i+1) + "." + k + ": " + str(v) for i,(k,v) in enumerate(unique_val.items())])
def custom_dataframe_chain(llm, df, unique_values):
unique_str = unique_value_str_func(unique_values)
print(unique_str)
prompt = PromptTemplate.from_template("""<s><INST>You have access to a pandas dataframe variable named "df". Below are the examples of the dataframe:
{df_example}
Given the following user input, create relevant python code to get the relevant information in the dataframe and store the response string result in a variable named "response". Do not explain, just create the python code:
{question}
Always change the corresponding columns into datetime format with parameter day_first=True, example:
df['column_name'] = pd.to_datetime(df['column_name'], day_first=True)
Always use idxmin or idxmax instead of array indicies whenever it is possible
Do not import pandas and Do not create or re-assign "df" variable
Below is the unique value of the important categorical columns:
{unique_val}
The output must follow the following example format:
```python
# Generated Code
```
</INST></s>""").partial(unique_val=unique_str)
return prompt | llm | RunnableLambda(lambda x:out_format(x, llm, df))
def custom_unique_df_chain(llm, df):
prompt = PromptTemplate(template="""<s><INST>You have access to a pandas dataframe variable named "df". Below are the examples of the dataframe:
{df_example}
Create unique values for the important non-datetime categorical columns with maximum 20 unique values for each columns. Store the unique values in a variable named "response" with the following format of python dictionary:
{{ column_name1 : [list_of_unique_column1], column_name2 : [list_of_unique_values_column2], column_name3 : [list_of_unique_values_column3] }}
The output must follow the following example format:
```python
# Generated Code
```
</INST></s>""", input_variables=["df_example"])
return prompt | llm | RunnableLambda(lambda x:out_format(x, llm, df))
def custom_combined_chain(llm, df_chain, memory_chain):
# prompt = PromptTemplate.from_template("""<s><INST> Given the following question, classify it as either being more relevant with a dataframe object of ticket submissions' history or several documents of user guide and general knowledge:
# <question>
# {question}
# </question>
# Respond with ONLY one word either "ticket" or "knowledge"
# </s></INST>""")
prompt = PromptTemplate.from_template("""<s><INST> You have access to the following data sources:
1. Dataframe : use this data source to retrieve anything about ticket submission history
2. Documents : use this data source to retrieve anything related to user guide and work instruction or any other question not related to ticket submission history
<question>
{question}
</question>
Respond with ONLY one word either "dataframe" or "documents"
</s></INST>
""")
# def route(info):
# if 'ticket' in info['topic']:
# return df_chain
# else:
# return memory_chain
# full_chain = RunnablePassthrough.assign(topic= (prompt | llm)) | RunnableLambda(route)
# combined_chain = prompt | llm
return RunnablePassthrough.assign(topic=prompt | llm) | RunnableBranch( (lambda x: "dataframe" in x['topic'].lower(), df_chain), memory_chain )
class CustomLLM(LLM):
repo_id : str
api_token : str
model_type: Literal["text2text-generation", "text-generation"]
max_new_tokens: int = None
temperature: float = 0.001
timeout: float = None
top_p: float = None
top_k : int = None
repetition_penalty : float = None
stop : List[str] = []
@property
def _llm_type(self) -> str:
return "custom"
def _call(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
headers = {"Authorization": f"Bearer {self.api_token}"}
API_URL = f"https://api-inference.huggingface.co/models/{self.repo_id}"
parameters_dict = {
'max_new_tokens': self.max_new_tokens,
'temperature': self.temperature,
'timeout': self.timeout,
'top_p': self.top_p,
'top_k': self.top_k,
'repetition_penalty': self.repetition_penalty,
'stop':self.stop
}
if self.model_type == 'text-generation':
parameters_dict["return_full_text"]=False
data = {"inputs": prompt, "parameters":parameters_dict, "options":{"wait_for_model":True}}
data = requests.post(API_URL, headers=headers, json=data).json()
return data[0]['generated_text']
@property
def _identifying_params(self) -> Mapping[str, Any]:
"""Get the identifying parameters."""
return {
'repo_id': self.repo_id,
'model_type':self.model_type,
'stop_sequences':self.stop,
'max_new_tokens': self.max_new_tokens,
'temperature': self.temperature,
'timeout': self.timeout,
'top_p': self.top_p,
'top_k': self.top_k,
'repetition_penalty': self.repetition_penalty
}