from threading import Thread from huggingface_hub import hf_hub_download import torch import gradio as gr import re import asyncio import requests import shutil from langchain import PromptTemplate, LLMChain from langchain.retrievers.document_compressors import EmbeddingsFilter from langchain.retrievers import ContextualCompressionRetriever from langchain.chains import RetrievalQA from langchain.vectorstores import FAISS from langchain.embeddings import HuggingFaceEmbeddings from langchain.text_splitter import CharacterTextSplitter from langchain.document_loaders import PyPDFLoader from langchain.text_splitter import RecursiveCharacterTextSplitter import os from langchain.llms import OpenAI llm = OpenAI(model_name='gpt-3.5-turbo-instruct') torch_device = "cuda" if torch.cuda.is_available() else "cpu" print("Running on device:", torch_device) print("CPU threads:", torch.get_num_threads()) loader = PyPDFLoader("total.pdf") pages = loader.load() # 데이터를 불러와서 텍스트를 일정한 수로 나누고 구분자로 연결하는 작업 text_splitter = RecursiveCharacterTextSplitter(chunk_size=600, chunk_overlap=0) texts = text_splitter.split_documents(pages) print(f"문서에 {len(texts)}개의 문서를 가지고 있습니다.") # 임베딩 모델 로드 embeddings = HuggingFaceEmbeddings(model_name="intfloat/multilingual-e5-large") # 문서에 있는 텍스트를 임베딩하고 FAISS 에 인덱스를 구축함 index = FAISS.from_documents( documents=texts, embedding=embeddings, ) # faiss_db 로 로컬에 저장하기 index.save_local("") # faiss_db 로 로컬에 로드하기 docsearch = FAISS.load_local("", embeddings) embeddings_filter = EmbeddingsFilter( embeddings=embeddings, similarity_threshold=0.7, k = 2, ) # 압축 검색기 생성 compression_retriever = ContextualCompressionRetriever( # embeddings_filter 설정 base_compressor=embeddings_filter, # retriever 를 호출하여 검색쿼리와 유사한 텍스트를 찾음 base_retriever=docsearch.as_retriever() ) id_list = [] history = [] customer_data_list = [] customer_agree_list = [] context = "{context}" question = "{question}" def gen(x, id, customer_data): index = 0 matched = 0 count = 0 for s in id_list: if s == id: matched = 1 break; index += 1 if matched == 0: index = len(id_list) id_list.append(id) customer_data_list.append(customer_data) if x != "약관동의_동의함": customer_agree_list.append("No") history.append('상담원:무엇을 도와드릴까요?\n\n') bot_str = "* 먼저 개인정보 이용 약관에 동의하셔야 원활한 상담을 진행할 수 있습니다. \n무엇을 도와드릴까요?" else: customer_agree_list.append("Yes") history.append('상담원:무엇을 도와드릴까요?\n\n') bot_str = f"개인정보 활용에 동의하셨습니다. 가입 보험을 조회합니다.\n\n현재 고객님께서 가입된 보험은 {customer_data}입니다.\n\n궁금하신 것이 있으신가요?" return bot_str else: if x == "초기화": if customer_agree_list[index] != "No": customer_data_list[index] = customer_data bot_str = f"대화기록이 모두 초기화되었습니다.\n\n현재 고객님께서 가입된 보험은 {customer_data}입니다.\n\n궁금하신 것이 있으신가요?" return bot_str else: customer_data_list[index] = "가입정보없음" history[index] = '상담원:무엇을 도와드릴까요?\n\n' bot_str = f"대화기록이 모두 초기화되었습니다.\n\n* 먼저 개인정보 이용 약관에 동의하셔야 원활한 상담을 진행할 수 있습니다.\n\n궁금하신 것이 있으신가요?" return bot_str elif x == "가입정보": if customer_agree_list[index] == "No": history[index] = '상담원:무엇을 도와드릴까요?\n\n' bot_str = f"* 먼저 개인정보 이용 약관에 동의하셔야 원활한 상담을 진행할 수 있습니다.\n\n궁금하신 것이 있으신가요?" return bot_str else: history[index] = '상담원:무엇을 도와드릴까요?\n' bot_str = f"현재 고객님께서 가입된 보험은 {customer_data_list[index]}입니다.\n\n궁금하신 것이 있으신가요?" return bot_str elif x == "약관동의_동의함": if customer_agree_list[index] == "No": history[index] = '상담원:무엇을 도와드릴까요?\n\n' customer_agree_list[index] = "Yes" customer_data_list[index] = customer_data bot_str = f"개인정보 활용에 동의하셨습니다. 가입 보험을 조회합니다.\n\n현재 고객님께서 가입된 보험은 {customer_data}입니다.\n\n궁금하신 것이 있으신가요?" return bot_str else: history[index] = '상담원:무엇을 도와드릴까요?\n' bot_str = f"이미 약관에 동의하셨습니다.\n\n궁금하신 것이 있으신가요?" return bot_str elif x == "약관동의_동의안함": if customer_agree_list[index] == "Yes": history[index] = '상담원:무엇을 도와드릴까요?\n\n' customer_agree_list[index] = "No" customer_data_list[index] = "가입정보없음" bot_str = f"* 개인정보 활용 동의를 취소하셨습니다. 이제 가입 보험을 조회할 수 없습니다.\n\n궁금하신 것이 있으신가요?" return bot_str else: history[index] = '상담원:무엇을 도와드릴까요?\n\n' bot_str = f"* 개인정보 활용을 거절하셨습니다. 가입 보험을 조회할 수 없습니다. \n\n궁금하신 것이 있으신가요?" return bot_str else: context = "{context}" question = "{question}" if customer_agree_list[index] == "No": customer_data_newline = "현재 가입정보를 조회할 수 없습니다. 가입할 수 있는 관련 보험을 위의 목록에서 소개해주세요." else: customer_data_newline = customer_data_list[index].replace(",","\n") prompt_template = f"""당신은 보험 상담원입니다. 아래에 전체 보험 목록, 질문과 관련된 약관 정보, 고객의 보험 가입 정보, 고객과의 상담기록이 주어집니다. 요청을 적절히 완료하는 응답을 작성하세요. 완성된 문장으로 간결히 답하세요. [전체 보험 목록] 라이프플래닛정기보험Ⅱ 라이프플래닛종신보험 라이프플래닛상해보험 만기까지비갱신암보험Ⅱ 라이프플래닛암보험Ⅲ 암·뇌·심장건강보험 뇌·심장건강보험 여성건강보험 건강치아보험 입원비보험 수술비보험 라이프플래닛플러스어린이보험Ⅱ 라이프플래닛플러스어린이종합보험 라이프플래닛에듀케어저축보험Ⅱ 라이프플래닛연금저축보험Ⅱ 1년부터저축보험 라이프플래닛연금보험Ⅱ 고객은 보험 목록과 약관을 볼 수 없습니다. 직접 제시하여 소개하세요. {context} [고객의 가입 정보] {customer_data_newline} ### 명령어: 주어지는 이전 대화를 보고 맥락을 파악하여 상담원으로서 고객에게 필요한 정보를 최대한 길고 자세하고 친절하게 제공하세요. 일반적인 보험 관련 지식은 해당 내용만 간결히 답변하세요. ### 질문: {question} ### 입력: [이전 대화] {history[index]} ### 응답: """ # RetrievalQA 클래스의 from_chain_type이라는 클래스 메서드를 호출하여 질의응답 객체를 생성 qa = RetrievalQA.from_chain_type( llm=llm, chain_type="stuff", retriever=compression_retriever, return_source_documents=False, verbose=True, chain_type_kwargs={"prompt": PromptTemplate( input_variables=["context","question"], template=prompt_template, )}, ) if customer_agree_list[index] == "No": query=f"{x}" else: query=f"{x}" response = qa({"query":query}) output_str = response['result'].rsplit(".",1)[0] + "." if output_str.split(":")[0]=="상담원": output_str = output_str.split(":")[1] history[index] += f"고객:{x}\n\n상담원:{output_str}\n\n" if customer_agree_list[index] == "No": output_str = f"* 먼저 개인정보 이용 약관에 동의하셔야 원활한 상담을 진행할 수 있습니다.\n\n" + output_str return output_str def reset_textbox(): return gr.update(value='') with gr.Blocks() as demo: gr.Markdown( "duplicated from beomi/KoRWKV-1.5B, baseModel:Llama-2-ko-7B-chat-gguf-q4_0" ) with gr.Row(): with gr.Column(scale=4): user_text = gr.Textbox( placeholder='입력', label="User input" ) model_output = gr.Textbox(label="Model output", lines=10, interactive=False) button_submit = gr.Button(value="Submit") with gr.Column(scale=1): id_text = gr.Textbox( placeholder='772727', label="User id" ) customer_data = gr.Textbox( placeholder='(무)1년부터저축보험, (무)수술비보험', label="customer_data" ) button_submit.click(gen, [user_text, id_text, customer_data], model_output) demo.queue().launch(enable_queue=True)