Spaces:
Runtime error
Runtime error
| from threading import Thread | |
| from huggingface_hub import hf_hub_download | |
| import torch | |
| import gradio as gr | |
| import re | |
| import asyncio | |
| import requests | |
| import shutil | |
| from langchain import PromptTemplate, LLMChain | |
| from langchain.retrievers.document_compressors import EmbeddingsFilter | |
| from langchain.retrievers import ContextualCompressionRetriever | |
| from langchain.chains import RetrievalQA | |
| from langchain.vectorstores import FAISS | |
| from langchain.embeddings import HuggingFaceEmbeddings | |
| from langchain.text_splitter import CharacterTextSplitter | |
| from langchain.document_loaders import PyPDFLoader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| import os | |
| from langchain.llms import OpenAI | |
| llm = OpenAI(model_name='gpt-3.5-turbo-instruct') | |
| torch_device = "cuda" if torch.cuda.is_available() else "cpu" | |
| print("Running on device:", torch_device) | |
| print("CPU threads:", torch.get_num_threads()) | |
| loader = PyPDFLoader("total.pdf") | |
| pages = loader.load() | |
| # λ°μ΄ν°λ₯Ό λΆλ¬μμ ν μ€νΈλ₯Ό μΌμ ν μλ‘ λλκ³ κ΅¬λΆμλ‘ μ°κ²°νλ μμ | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=600, chunk_overlap=0) | |
| texts = text_splitter.split_documents(pages) | |
| print(f"λ¬Έμμ {len(texts)}κ°μ λ¬Έμλ₯Ό κ°μ§κ³ μμ΅λλ€.") | |
| # μλ² λ© λͺ¨λΈ λ‘λ | |
| embeddings = HuggingFaceEmbeddings(model_name="intfloat/multilingual-e5-large") | |
| # λ¬Έμμ μλ ν μ€νΈλ₯Ό μλ² λ©νκ³ FAISS μ μΈλ±μ€λ₯Ό ꡬμΆν¨ | |
| index = FAISS.from_documents( | |
| documents=texts, | |
| embedding=embeddings, | |
| ) | |
| # faiss_db λ‘ λ‘컬μ μ μ₯νκΈ° | |
| index.save_local("") | |
| # faiss_db λ‘ λ‘컬μ λ‘λνκΈ° | |
| docsearch = FAISS.load_local("", embeddings) | |
| embeddings_filter = EmbeddingsFilter( | |
| embeddings=embeddings, | |
| similarity_threshold=0.7, | |
| k = 2, | |
| ) | |
| # μμΆ κ²μκΈ° μμ± | |
| compression_retriever = ContextualCompressionRetriever( | |
| # embeddings_filter μ€μ | |
| base_compressor=embeddings_filter, | |
| # retriever λ₯Ό νΈμΆνμ¬ κ²μ쿼리μ μ μ¬ν ν μ€νΈλ₯Ό μ°Ύμ | |
| base_retriever=docsearch.as_retriever() | |
| ) | |
| id_list = [] | |
| history = [] | |
| customer_data_list = [] | |
| customer_agree_list = [] | |
| context = "{context}" | |
| question = "{question}" | |
| def gen(x, id, customer_data): | |
| index = 0 | |
| matched = 0 | |
| count = 0 | |
| for s in id_list: | |
| if s == id: | |
| matched = 1 | |
| break; | |
| index += 1 | |
| if matched == 0: | |
| index = len(id_list) | |
| id_list.append(id) | |
| customer_data_list.append(customer_data) | |
| if x != "μ½κ΄λμ_λμν¨": | |
| customer_agree_list.append("No") | |
| history.append('μλ΄μ:무μμ λμλ릴κΉμ?\n\n') | |
| bot_str = "* λ¨Όμ κ°μΈμ 보 μ΄μ© μ½κ΄μ λμνμ μΌ μνν μλ΄μ μ§νν μ μμ΅λλ€. \n무μμ λμλ릴κΉμ?" | |
| else: | |
| customer_agree_list.append("Yes") | |
| history.append('μλ΄μ:무μμ λμλ릴κΉμ?\n\n') | |
| bot_str = f"κ°μΈμ 보 νμ©μ λμνμ ¨μ΅λλ€. κ°μ 보νμ μ‘°νν©λλ€.\n\nνμ¬ κ³ κ°λκ»μ κ°μ λ 보νμ {customer_data}μ λλ€.\n\nκΆκΈνμ κ²μ΄ μμΌμ κ°μ?" | |
| return bot_str | |
| else: | |
| if x == "μ΄κΈ°ν": | |
| if customer_agree_list[index] != "No": | |
| customer_data_list[index] = customer_data | |
| bot_str = f"λνκΈ°λ‘μ΄ λͺ¨λ μ΄κΈ°νλμμ΅λλ€.\n\nνμ¬ κ³ κ°λκ»μ κ°μ λ 보νμ {customer_data}μ λλ€.\n\nκΆκΈνμ κ²μ΄ μμΌμ κ°μ?" | |
| return bot_str | |
| else: | |
| customer_data_list[index] = "κ°μ μ 보μμ" | |
| history[index] = 'μλ΄μ:무μμ λμλ릴κΉμ?\n\n' | |
| bot_str = f"λνκΈ°λ‘μ΄ λͺ¨λ μ΄κΈ°νλμμ΅λλ€.\n\n* λ¨Όμ κ°μΈμ 보 μ΄μ© μ½κ΄μ λμνμ μΌ μνν μλ΄μ μ§νν μ μμ΅λλ€.\n\nκΆκΈνμ κ²μ΄ μμΌμ κ°μ?" | |
| return bot_str | |
| elif x == "κ°μ μ 보": | |
| if customer_agree_list[index] == "No": | |
| history[index] = 'μλ΄μ:무μμ λμλ릴κΉμ?\n\n' | |
| bot_str = f"* λ¨Όμ κ°μΈμ 보 μ΄μ© μ½κ΄μ λμνμ μΌ μνν μλ΄μ μ§νν μ μμ΅λλ€.\n\nκΆκΈνμ κ²μ΄ μμΌμ κ°μ?" | |
| return bot_str | |
| else: | |
| history[index] = 'μλ΄μ:무μμ λμλ릴κΉμ?\n' | |
| bot_str = f"νμ¬ κ³ κ°λκ»μ κ°μ λ 보νμ {customer_data_list[index]}μ λλ€.\n\nκΆκΈνμ κ²μ΄ μμΌμ κ°μ?" | |
| return bot_str | |
| elif x == "μ½κ΄λμ_λμν¨": | |
| if customer_agree_list[index] == "No": | |
| history[index] = 'μλ΄μ:무μμ λμλ릴κΉμ?\n\n' | |
| customer_agree_list[index] = "Yes" | |
| customer_data_list[index] = customer_data | |
| bot_str = f"κ°μΈμ 보 νμ©μ λμνμ ¨μ΅λλ€. κ°μ 보νμ μ‘°νν©λλ€.\n\nνμ¬ κ³ κ°λκ»μ κ°μ λ 보νμ {customer_data}μ λλ€.\n\nκΆκΈνμ κ²μ΄ μμΌμ κ°μ?" | |
| return bot_str | |
| else: | |
| history[index] = 'μλ΄μ:무μμ λμλ릴κΉμ?\n' | |
| bot_str = f"μ΄λ―Έ μ½κ΄μ λμνμ ¨μ΅λλ€.\n\nκΆκΈνμ κ²μ΄ μμΌμ κ°μ?" | |
| return bot_str | |
| elif x == "μ½κ΄λμ_λμμν¨": | |
| if customer_agree_list[index] == "Yes": | |
| history[index] = 'μλ΄μ:무μμ λμλ릴κΉμ?\n\n' | |
| customer_agree_list[index] = "No" | |
| customer_data_list[index] = "κ°μ μ 보μμ" | |
| bot_str = f"* κ°μΈμ 보 νμ© λμλ₯Ό μ·¨μνμ ¨μ΅λλ€. μ΄μ κ°μ 보νμ μ‘°νν μ μμ΅λλ€.\n\nκΆκΈνμ κ²μ΄ μμΌμ κ°μ?" | |
| return bot_str | |
| else: | |
| history[index] = 'μλ΄μ:무μμ λμλ릴κΉμ?\n\n' | |
| bot_str = f"* κ°μΈμ 보 νμ©μ κ±°μ νμ ¨μ΅λλ€. κ°μ 보νμ μ‘°νν μ μμ΅λλ€. \n\nκΆκΈνμ κ²μ΄ μμΌμ κ°μ?" | |
| return bot_str | |
| else: | |
| context = "{context}" | |
| question = "{question}" | |
| if customer_agree_list[index] == "No": | |
| customer_data_newline = "νμ¬ κ°μ μ 보λ₯Ό μ‘°νν μ μμ΅λλ€. κ°μ ν μ μλ κ΄λ ¨ 보νμ μμ λͺ©λ‘μμ μκ°ν΄μ£ΌμΈμ." | |
| else: | |
| customer_data_newline = customer_data_list[index].replace(",","\n") | |
| prompt_template = f"""λΉμ μ 보ν μλ΄μμ λλ€. μλμ μ 체 보ν λͺ©λ‘, μ§λ¬Έκ³Ό κ΄λ ¨λ μ½κ΄ μ 보, κ³ κ°μ 보ν κ°μ μ 보, κ³ κ°κ³Όμ μλ΄κΈ°λ‘μ΄ μ£Όμ΄μ§λλ€. μμ²μ μ μ ν μλ£νλ μλ΅μ μμ±νμΈμ. μμ±λ λ¬Έμ₯μΌλ‘ κ°κ²°ν λ΅νμΈμ. | |
| [μ 체 보ν λͺ©λ‘] | |
| λΌμ΄ννλλμ 기보νβ ‘ | |
| λΌμ΄ννλλμ’ μ 보ν | |
| λΌμ΄ννλλμν΄λ³΄ν | |
| λ§κΈ°κΉμ§λΉκ°±μ μ보νβ ‘ | |
| λΌμ΄ννλλμ보νβ ’ | |
| μΒ·λΒ·μ¬μ₯건κ°λ³΄ν | |
| λΒ·μ¬μ₯건κ°λ³΄ν | |
| μ¬μ±κ±΄κ°λ³΄ν | |
| 건κ°μΉμ보ν | |
| μ μλΉλ³΄ν | |
| μμ λΉλ³΄ν | |
| λΌμ΄ννλλνλ¬μ€μ΄λ¦°μ΄λ³΄νβ ‘ | |
| λΌμ΄ννλλνλ¬μ€μ΄λ¦°μ΄μ’ ν©λ³΄ν | |
| λΌμ΄ννλλμλμΌμ΄μ μΆλ³΄νβ ‘ | |
| λΌμ΄ννλλμ°κΈμ μΆλ³΄νβ ‘ | |
| 1λ λΆν°μ μΆλ³΄ν | |
| λΌμ΄ννλλμ°κΈλ³΄νβ ‘ | |
| κ³ κ°μ 보ν λͺ©λ‘κ³Ό μ½κ΄μ λ³Ό μ μμ΅λλ€. μ§μ μ μνμ¬ μκ°νμΈμ. | |
| {context} | |
| [κ³ κ°μ κ°μ μ 보] | |
| {customer_data_newline} | |
| ### λͺ λ Ήμ΄: | |
| μ£Όμ΄μ§λ μ΄μ λνλ₯Ό λ³΄κ³ λ§₯λ½μ νμ νμ¬ μλ΄μμΌλ‘μ κ³ κ°μκ² νμν μ 보λ₯Ό μ΅λν κΈΈκ³ μμΈνκ³ μΉμ νκ² μ 곡νμΈμ. μΌλ°μ μΈ λ³΄ν κ΄λ ¨ μ§μμ ν΄λΉ λ΄μ©λ§ κ°κ²°ν λ΅λ³νμΈμ. | |
| ### μ§λ¬Έ: | |
| {question} | |
| ### μ λ ₯: | |
| [μ΄μ λν] | |
| {history[index]} | |
| ### μλ΅: | |
| """ | |
| # RetrievalQA ν΄λμ€μ from_chain_typeμ΄λΌλ ν΄λμ€ λ©μλλ₯Ό νΈμΆνμ¬ μ§μμλ΅ κ°μ²΄λ₯Ό μμ± | |
| qa = RetrievalQA.from_chain_type( | |
| llm=llm, | |
| chain_type="stuff", | |
| retriever=compression_retriever, | |
| return_source_documents=False, | |
| verbose=True, | |
| chain_type_kwargs={"prompt": PromptTemplate( | |
| input_variables=["context","question"], | |
| template=prompt_template, | |
| )}, | |
| ) | |
| if customer_agree_list[index] == "No": | |
| query=f"{x}" | |
| else: | |
| query=f"{x}" | |
| response = qa({"query":query}) | |
| output_str = response['result'].rsplit(".",1)[0] + "." | |
| if output_str.split(":")[0]=="μλ΄μ": | |
| output_str = output_str.split(":")[1] | |
| history[index] += f"κ³ κ°:{x}\n\nμλ΄μ:{output_str}\n\n" | |
| if customer_agree_list[index] == "No": | |
| output_str = f"* λ¨Όμ κ°μΈμ 보 μ΄μ© μ½κ΄μ λμνμ μΌ μνν μλ΄μ μ§νν μ μμ΅λλ€.\n\n" + output_str | |
| return output_str | |
| def reset_textbox(): | |
| return gr.update(value='') | |
| with gr.Blocks() as demo: | |
| gr.Markdown( | |
| "duplicated from beomi/KoRWKV-1.5B, baseModel:Llama-2-ko-7B-chat-gguf-q4_0" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| user_text = gr.Textbox( | |
| placeholder='μ λ ₯', | |
| label="User input" | |
| ) | |
| model_output = gr.Textbox(label="Model output", lines=10, interactive=False) | |
| button_submit = gr.Button(value="Submit") | |
| with gr.Column(scale=1): | |
| id_text = gr.Textbox( | |
| placeholder='772727', | |
| label="User id" | |
| ) | |
| customer_data = gr.Textbox( | |
| placeholder='(무)1λ λΆν°μ μΆλ³΄ν, (무)μμ λΉλ³΄ν', | |
| label="customer_data" | |
| ) | |
| button_submit.click(gen, [user_text, id_text, customer_data], model_output) | |
| demo.queue().launch(enable_queue=True) |