chatbot_v2 / custom_llm.py
jonathanjordan21's picture
Update custom_llm.py
1663093 verified
from typing import Any, List, Mapping, Optional
from langchain_core.callbacks.manager import CallbackManagerForLLMRun
from langchain_core.language_models.llms import LLM
from typing import Literal
import requests
from langchain.prompts import PromptTemplate, ChatPromptTemplate
from operator import itemgetter
from langchain.memory import ChatMessageHistory, ConversationBufferMemory
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_community.chat_models import ChatOpenAI
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_core.messages import AIMessage, HumanMessage
from langchain_community.document_loaders import DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyMuPDFLoader
import os, requests, bs4
from langchain.embeddings import HuggingFaceEmbeddings
from langchain_community.embeddings import HuggingFaceInferenceAPIEmbeddings
from langchain.vectorstores import FAISS
import pickle, asyncio
# os.environ['FAISS_NO_AVX2'] = '1'
def load_web(web_url):
r = requests.get(web_url)
soup=bs4.BeautifulSoup(r.content,"html.parser")
# input_list=
input_list = [div.text.strip() for div in soup.find_all("div") if div.text.strip() !='']
unique_strings = {}
for item in input_list:
# Remove '\n' and leading/trailing whitespaces
# cleaned_item = item.strip('\n').strip()
cleaned_item = item.strip()
# Check if the cleaned_item is not in the dictionary or if it's shorter
if cleaned_item not in unique_strings or len(item) > len(unique_strings[cleaned_item]):
# Add the cleaned_item to the dictionary with the original item as value
unique_strings[cleaned_item] = item
# Create a new list with the unique strings
result_list = list(unique_strings.values())
return result_list
async def create_vectorstore():
API_TOKEN = os.getenv('HF_INFER_API')
loader = os.getenv('knowledge_base')
# web_loader = load_web("https://lintasmediadanawa.com")
splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=20)
# docs = splitter.create_documents([loader]+web_loader)
docs = splitter.create_documents([loader])
print(len(docs))
emb_model = HuggingFaceEmbeddings(model_name='sentence-transformers/paraphrase-multilingual-mpnet-base-v2', encode_kwargs={'normalize_embeddings': True})
# emb_model = HuggingFaceInferenceAPIEmbeddings(
# api_key=API_TOKEN, model_name="sentence-transformers/paraphrase-multilingual-mpnet-base-v2", encode_kwargs={'normalize_embeddings': True}
# )
async def add_docs(d):
db.aadd_documents(await splitter.atransform_documents([d]))
db = await FAISS.afrom_documents(docs, emb_model)
f = pickle.load(open("wi_knowledge.dat", "rb"))
print("Docs len :", len(f))
tasks = []
for d in f:
tasks.append(db.aadd_documents(await splitter.atransform_documents([d])))
await asyncio.gather(*tasks)
# asyncio.run(db.aadd_documents(asyncio.run(splitter.atransform_documents(f))))
# emb_model = HuggingFaceInferenceAPIEmbeddings(
# api_key=API_TOKEN, model_name="sentence-transformers/paraphrase-multilingual-mpnet-base-v2", encode_kwargs={'normalize_embeddings': True}
# )
# x = open("wi_knowledge.pkl", 'rb')
# db = FAISS.deserialize_from_bytes(
# embeddings=emb_model, serialized=x
# )
# db = pickle.load(x)
# print(db)
# db.add_documents( splitter.transform_documents(docs) )
return db
def custom_chain_with_history(llm, memory):
# prompt = PromptTemplate.from_template("""<s><INST><|system|>
# You are a helpful and informative AI customer service assistant. Always remember to thank the customer when they say thank you and greet them when they greet you.
# You have access to the following context of knowledge base and internal resources to find the most relevant information for the customer's needs:
# {context}
# Respond to the user with the following chat history between you and the user:
# {chat_history}
# <|user|>
# {question}
# <|assistant|>
# """)
prompt = PromptTemplate.from_template("""<s><INST><|system|>
Anda adalah asisten AI Chatbot customer service.
Anda memiliki akses konteks dibawah ini untuk menemukan informasi yang paling relevan dengan kebutuhan user:
{context}
Berikan respon kepada user berdasarkan riwayat chat berikut dengan bahasa yang digunakan terakhir kali oleh user, jika tidak ada informasi yang relevan maka itu adalah informasi yang rahasia dan Anda tidak diizinkan untuk menyebarkan informasi tersebut kepada user:
{chat_history}
<|user|>
{question}
<|assistant|>
""")
def prompt_memory(memory):
t = ""
for x in memory.chat_memory.messages:
t += f"<|assistant|>\n<s>{x.content}</s>\n" if type(x) is AIMessage else f"<|user|>\n{x.content}\n"
return "" if len(t) == 0 else t
def format_docs(docs):
# print(len(docs))
return "\n".join([f"{i+1}. {d.page_content}" for i,d in enumerate(docs)])
# prompt = ChatPromptTemplate.from_messages(
# [
# ("system", "You are a helpful chatbot"),
# MessagesPlaceholder(variable_name="history"),
# ("human", "{input}"),
# ]
# )
# return {"chat_history":prompt_memory, "context":asyncio.run(create_vectorstore()).as_retriever(search_type="similarity", search_kwargs={"k": 12}) | format_docs, "question": RunnablePassthrough()} | prompt | llm
return {"chat_history":lambda x:prompt_memory(x['memory']), "context":itemgetter("question") | asyncio.run(create_vectorstore()).as_retriever(search_type="similarity", search_kwargs={"k": 12}) | format_docs, "question": lambda x:x['question']} | prompt | llm
class CustomLLM(LLM):
repo_id : str
api_token : str
model_type: Literal["text2text-generation", "text-generation"]
max_new_tokens: int = None
temperature: float = 0.001
timeout: float = None
top_p: float = None
top_k : int = None
repetition_penalty : float = None
stop : List[str] = []
@property
def _llm_type(self) -> str:
return "custom"
def _call(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
headers = {"Authorization": f"Bearer {self.api_token}"}
API_URL = f"https://api-inference.huggingface.co/models/{self.repo_id}"
parameters_dict = {
'max_new_tokens': self.max_new_tokens,
'temperature': self.temperature,
'timeout': self.timeout,
'top_p': self.top_p,
'top_k': self.top_k,
'repetition_penalty': self.repetition_penalty,
'stop':self.stop
}
if self.model_type == 'text-generation':
parameters_dict["return_full_text"]=False
data = {"inputs": prompt, "parameters":parameters_dict, "options":{"wait_for_model":True}}
data = requests.post(API_URL, headers=headers, json=data).json()
return data[0]['generated_text']
@property
def _identifying_params(self) -> Mapping[str, Any]:
"""Get the identifying parameters."""
return {
'repo_id': self.repo_id,
'model_type':self.model_type,
'stop_sequences':self.stop,
'max_new_tokens': self.max_new_tokens,
'temperature': self.temperature,
'timeout': self.timeout,
'top_p': self.top_p,
'top_k': self.top_k,
'repetition_penalty': self.repetition_penalty
}