import streamlit as st import openai from streamlit_chat import message as st_message from transformers import BlenderbotTokenizer from transformers import BlenderbotForConditionalGeneration from io import StringIO from io import BytesIO import requests import torch import PyPDF2 from transformers import GenerationConfig, LlamaTokenizer, LlamaForCausalLM from langchain.embeddings.openai import OpenAIEmbeddings from langchain.vectorstores import Chroma from langchain.text_splitter import CharacterTextSplitter from langchain.llms import OpenAI from langchain.chains import RetrievalQA from langchain.document_loaders import TextLoader import os os.environ['OPENAI_API_KEY']="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" import warnings warnings.filterwarnings("ignore") st.markdown( """ """, unsafe_allow_html=True, ) st.sidebar.title('ChatFAQ') st.sidebar.subheader('Parameters') @st.cache_resource def get_models(): # it may be necessary for other frameworks to cache the model # seems pytorch keeps an internal state of the conversation model_name = "facebook/blenderbot-400M-distill" tokenizer = BlenderbotTokenizer.from_pretrained(model_name) model = BlenderbotForConditionalGeneration.from_pretrained(model_name) return tokenizer, model st.title("ChatFAQ") app_mode = st.sidebar.selectbox('Choose the App mode', ['Blenderbot_1B', 'Blenderbot-400M-distill', 'ChatGPT-3.5', 'Fine-tune Alpaca 7B', 'Customized Alpaca 7B', 'Alpaca-LORA'] ) # app_mode = st.sidebar.selectbox('Choose the domain', # ['Law','Economic','Technology'] # ) uploaded_file = st.sidebar.file_uploader("Choose a file") if uploaded_file is not None: string_data = "" file_type = uploaded_file.type if file_type == "application/pdf": bytes_data = uploaded_file.getvalue() # Create a BytesIO object from the bytes data bytes_io = BytesIO(bytes_data) # Create a PDF reader object pdf_reader = PyPDF2.PdfReader(bytes_io) # Get the number of pages in the PDF file num_pages = len(pdf_reader.pages) # Loop through each page and extract the text for i in range(num_pages): page = pdf_reader.pages[i] text = page.extract_text() string_data = string_data + text elif file_type == "text/plain": with st.spinner('Loading the document...'): # To convert to a string based IO: stringio = StringIO(uploaded_file.getvalue().decode("utf-8")) # To read file as string: string_data = stringio.read() st.success('Loading successfully!') if app_mode =='Blenderbot_1B': st.markdown('In this application, **Blenderbot_1B API** is used and **StreamLit** is to create the Web Graphical User Interface (GUI).') st.markdown( """ """, unsafe_allow_html=True, ) if 'history1' not in st.session_state: st.session_state['history1'] = [] API_TOKEN = "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" API_URL = "https://api-inference.huggingface.co/models/facebook/blenderbot-1B-distill" headers = {"Authorization": f"Bearer {API_TOKEN}"} def query(payload): response = requests.post(API_URL, headers=headers, json=payload) return response.json() def generate_answer(): historyInputs = {"past_user_inputs": [], "generated_responses": []} for element in st.session_state["history1"]: if element["is_user"] == True: historyInputs["past_user_inputs"].append(element["message"]) else: historyInputs["generated_responses"].append(element["message"]) user_message = st.session_state.input_text historyInputs["text"] = user_message if user_message != "" else " " print(historyInputs) output = query({ "inputs": historyInputs, }) print(output) print(output["generated_text"]) st.session_state['history1'].append({"message": user_message, "is_user": True}) st.session_state['history1'].append({"message": output["generated_text"], "is_user": False}) print(st.session_state['history1']) for chat in st.session_state['history1']: st_message(**chat) # unpacking st.text_input("Talk to the bot", key="input_text", on_change=generate_answer) if st.button("Clear"): st.session_state["history1"] = [] for chat in st.session_state['history1']: st_message(**chat) # unpacking if app_mode =='Blenderbot-400M-distill': st.markdown('In this application, **Blenderbot-400M-distill API** is used and **StreamLit** is to create the Web Graphical User Interface (GUI).') st.markdown( """ """, unsafe_allow_html=True, ) if 'history2' not in st.session_state: st.session_state['history2'] = [] def generate_answer(): tokenizer, model = get_models() user_message = st.session_state.input_text print(type(user_message), user_message) History_inputs = [] for element in st.session_state["history2"]: if element["is_user"] == True: History_inputs.append(element["message"]) historyInputs = ". ".join(History_inputs) print(historyInputs + " " + st.session_state.input_text) inputs = tokenizer(historyInputs + " . " + st.session_state.input_text, return_tensors="pt") result = model.generate(**inputs) message_bot = tokenizer.decode( result[0], skip_special_tokens=True ) # .replace("", "").replace("", "") st.session_state['history2'].append({"message": user_message, "is_user": True}) st.session_state['history2'].append({"message": message_bot, "is_user": False}) for chat in st.session_state['history2']: st_message(**chat) # unpacking st.text_input("Talk to the bot", key="input_text", on_change=generate_answer) if st.button("Clear"): st.session_state["history2"] = [] for chat in st.session_state['history2']: st_message(**chat) # unpacking if app_mode =='ChatGPT-3.5': counter = 0 def get_unique_key(): global counter counter += 1 return f"chat{counter}" OPENAI_KEY="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" openai.api_key = OPENAI_KEY openai_engine = openai.ChatCompletion() if 'history3' not in st.session_state: st.session_state['history3'] = [] if "messages" not in st.session_state: st.session_state["messages"] = [] if "messagesDocument" not in st.session_state: st.session_state["messagesDocument"] = [] def generate_answer(): st.session_state["messages"] += [{"role": "user", "content": st.session_state.input_text}] response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=st.session_state["messages"] ) message_response = response["choices"][0]["message"]["content"] st.session_state["messages"] += [ {"role": "system", "content": message_response} ] st.session_state['history3'].append({"message": st.session_state.input_text, "is_user": True}) st.session_state['history3'].append({"message": message_response, "is_user": False}) print(st.session_state['history3']) print(st.session_state["messages"]) if st.button("Retrieve the document's content"): if uploaded_file is None: st.error("Please input the document!", icon="🚨") else: with st.spinner('Wait for processing the document...'): with open("my_text.txt", "w", encoding='utf-8') as f: f.write(string_data) loader = TextLoader("my_text.txt", encoding='utf-8') documents = loader.load() print(type(documents), documents) text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0) texts = text_splitter.split_documents(documents) embeddings = OpenAIEmbeddings() docsearch = Chroma.from_documents(texts, embeddings) qa = RetrievalQA.from_chain_type(llm=OpenAI(), chain_type="stuff", retriever=docsearch.as_retriever(search_kwargs={"k": 1})) st.success('Successful!') def generate_answer(): query = st.session_state.input_text docs = qa.run(query) system_prompt_first = """ You are a helpful assisant that help user with answering questions over a content that was pulled from a database ---CONTENT START---\n """ system_prompt_second = """ \n---CONTENT END--- Based on information pulled from the database, answer the question below from the user. If the content pulled from the database is not related to the question, say "I do not have enough information for this question" Question: """ system_prompt_ans = "\nAnswer:" prompt = system_prompt_first + docs + system_prompt_second + query + system_prompt_ans print(prompt) st.session_state["messagesDocument"] += [{"role": "user", "content": prompt}] message_response = openai_engine.create(model='gpt-3.5-turbo',messages=st.session_state["messagesDocument"]) st.session_state['history3'].append({"message": st.session_state.input_text, "is_user": True}) st.session_state['history3'].append({"message": message_response.choices[0].message.content, "is_user": False}) st.session_state["messagesDocument"] += [ {"role": "system", "content": message_response.choices[0].message.content} ] print(st.session_state["messagesDocument"]) st.markdown(""" """, unsafe_allow_html=True) for chat in st.session_state['history3']: st_message(**chat, key=get_unique_key()) # unpacking st.text_input("Talk to the bot: ",placeholder = "Ask me anything ...", key="input_text", on_change=generate_answer) if st.button("Clear"): st.session_state["history3"] = [] st.session_state["messages"] = [] for chat in st.session_state['history3']: st_message(**chat, key=get_unique_key()) # unpacking if app_mode =='Fine-tune Alpaca 7B': st.markdown('In this application, we are using **Fine-tune Alpaca 7B API** and **StreamLit** is to create the Web Graphical User Interface (GUI). ') st.markdown( """ """, unsafe_allow_html=True, ) if app_mode =='Customized Alpaca 7B': st.markdown('In this application, we are using **PART - Part Attention Regressor for 3D Human Body Estimation [ICCV 2021]** for creating Body Mesh and **Dynamic Time Warping** for comparing poses. **StreamLit** is to create the Web Graphical User Interface (GUI). ') st.markdown( """ """, unsafe_allow_html=True, ) if app_mode =='Alpaca-LORA': st.markdown('In this application, we are using **PART - Part Attention Regressor for 3D Human Body Estimation [ICCV 2021]** for creating Body Mesh and **Dynamic Time Warping** for comparing poses. **StreamLit** is to create the Web Graphical User Interface (GUI). ') st.markdown( """ """, unsafe_allow_html=True, ) def generate_prompt(instruction: str, input_ctxt: str = None) -> str: if input_ctxt: return f"""Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request. ### Instruction: {instruction} ### Input: {input_ctxt} ### Response:""" else: return f"""Below is an instruction that describes a task. Write a response that appropriately completes the request. ### Instruction: {instruction} ### Response:""" tokenizer = LlamaTokenizer.from_pretrained("chainyo/alpaca-lora-7b") model = LlamaForCausalLM.from_pretrained( "chainyo/alpaca-lora-7b", load_in_8bit=True, torch_dtype=torch.float16, device_map="auto", ) generation_config = GenerationConfig( temperature=0.2, top_p=0.75, top_k=40, num_beams=4, max_new_tokens=128, ) model.eval() if torch.__version__ >= "2": model = torch.compile(model) instruction = "What is the meaning of life?" input_ctxt = None # For some tasks, you can provide an input context to help the model generate a better response. prompt = generate_prompt(instruction, input_ctxt) input_ids = tokenizer(prompt, return_tensors="pt").input_ids input_ids = input_ids.to(model.device) with torch.no_grad(): outputs = model.generate( input_ids=input_ids, generation_config=generation_config, return_dict_in_generate=True, output_scores=True, ) response = tokenizer.decode(outputs.sequences[0], skip_special_tokens=True) print(response) # def generate_answer(): # tokenizer, model = get_models() # user_message = st.session_state.input_text # print(type(user_message), user_message) # History_inputs = [] # for element in st.session_state["history"]: # if element["is_user"] == True: # History_inputs.append(element["message"]) # historyInputs = ". ".join(History_inputs) # print(historyInputs + " " + st.session_state.input_text) # inputs = tokenizer(historyInputs + " . " + st.session_state.input_text, return_tensors="pt") # result = model.generate(**inputs) # message_bot = tokenizer.decode( # result[0], skip_special_tokens=True # ) # .replace("", "").replace("", "") # st.session_state['history'].append({"message": user_message, "is_user": True}) # st.session_state['history'].append({"message": message_bot, "is_user": False}) # for chat in st.session_state['history']: # st_message(**chat) # unpacking # st.text_input("Talk to the bot", key="input_text", on_change=generate_answer)