|
import requests |
|
import json |
|
import gradio as gr |
|
|
|
import pdfplumber |
|
import pandas as pd |
|
import langchain |
|
import time |
|
from cnocr import CnOcr |
|
import pinecone |
|
import openai |
|
from langchain.vectorstores import Pinecone |
|
from langchain.embeddings.openai import OpenAIEmbeddings |
|
from langchain.text_splitter import CharacterTextSplitter |
|
|
|
|
|
from langchain.document_loaders import UnstructuredWordDocumentLoader |
|
from langchain.document_loaders import UnstructuredPowerPointLoader |
|
|
|
|
|
|
|
from langchain.chains.question_answering import load_qa_chain |
|
from langchain import OpenAI |
|
|
|
from sentence_transformers import SentenceTransformer, models, util |
|
word_embedding_model = models.Transformer('sentence-transformers/all-MiniLM-L6-v2', do_lower_case=True) |
|
pooling_model = models.Pooling(word_embedding_model.get_word_embedding_dimension(), pooling_mode='cls') |
|
embedder = SentenceTransformer(modules=[word_embedding_model, pooling_model]) |
|
ocr = CnOcr() |
|
|
|
chat_url = 'https://Raghav001-API.hf.space/chatpdf' |
|
chat_emd = 'https://Raghav001-API.hf.space/embedd' |
|
headers = { |
|
'Content-Type': 'application/json', |
|
} |
|
|
|
history_max_len = 500 |
|
all_max_len = 3000 |
|
|
|
|
|
|
|
|
|
pinecone.init(api_key='d0a5b89b-b901-4b47-bc99-38b93695390d',environment = 'asia-southeast1-gcp') |
|
index = pinecone.Index(index_name='test') |
|
|
|
|
|
def get_emb(text): |
|
emb_url = 'https://Raghav001-API.hf.space/embeddings' |
|
data = {"content": text} |
|
try: |
|
result = requests.post(url=emb_url, |
|
data=json.dumps(data), |
|
headers=headers |
|
) |
|
print("--------------------------------Embeddings--------------------------------------") |
|
print(result.json()['data'][0]['embedding']) |
|
return result.json()['data'][0]['embedding'] |
|
except Exception as e: |
|
print('data', data, 'result json', result.json()) |
|
|
|
|
|
def doc_emb(doc: str): |
|
texts = doc.split('\n') |
|
|
|
emb_list = embedder.encode(texts) |
|
print('emb_list',emb_list) |
|
|
|
|
|
|
|
|
|
print('\n'.join(texts)) |
|
gr.Textbox.update(value="") |
|
return texts, emb_list, gr.Textbox.update(visible=True), gr.Button.update(visible=True), gr.Markdown.update( |
|
value="""success ! Let's talk"""), gr.Chatbot.update(visible=True) |
|
|
|
|
|
def get_response(msg, bot, doc_text_list, doc_embeddings): |
|
|
|
gr.Textbox.update(value="") |
|
now_len = len(msg) |
|
req_json = {'question': msg} |
|
his_bg = -1 |
|
for i in range(len(bot) - 1, -1, -1): |
|
if now_len + len(bot[i][0]) + len(bot[i][1]) > history_max_len: |
|
break |
|
now_len += len(bot[i][0]) + len(bot[i][1]) |
|
his_bg = i |
|
req_json['history'] = [] if his_bg == -1 else bot[his_bg:] |
|
|
|
query_embedding = embedder.encode([msg]) |
|
cos_scores = util.cos_sim(query_embedding, doc_embeddings)[0] |
|
score_index = [[score, index] for score, index in zip(cos_scores, [i for i in range(len(cos_scores))])] |
|
score_index.sort(key=lambda x: x[0], reverse=True) |
|
print('score_index:\n', score_index) |
|
print('doc_emb_state', doc_emb_state) |
|
index_set, sub_doc_list = set(), [] |
|
for s_i in score_index: |
|
doc = doc_text_list[s_i[1]] |
|
if now_len + len(doc) > all_max_len: |
|
break |
|
index_set.add(s_i[1]) |
|
now_len += len(doc) |
|
|
|
if s_i[1] > 0 and s_i[1] -1 not in index_set: |
|
doc = doc_text_list[s_i[1]-1] |
|
if now_len + len(doc) > all_max_len: |
|
break |
|
index_set.add(s_i[1]-1) |
|
now_len += len(doc) |
|
if s_i[1] + 1 < len(doc_text_list) and s_i[1] + 1 not in index_set: |
|
doc = doc_text_list[s_i[1]+1] |
|
if now_len + len(doc) > all_max_len: |
|
break |
|
index_set.add(s_i[1]+1) |
|
now_len += len(doc) |
|
|
|
index_list = list(index_set) |
|
index_list.sort() |
|
for i in index_list: |
|
sub_doc_list.append(doc_text_list[i]) |
|
req_json['doc'] = '' if len(sub_doc_list) == 0 else '\n'.join(sub_doc_list) |
|
data = {"content": json.dumps(req_json)} |
|
print('data:\n', req_json) |
|
result = requests.post(url=chat_url, |
|
data=json.dumps(data), |
|
headers=headers |
|
) |
|
res = result.json()['content'] |
|
bot.append([msg, res]) |
|
return bot[max(0, len(bot) - 3):] |
|
|
|
|
|
def up_file(fls): |
|
doc_text_list = [] |
|
|
|
|
|
names = [] |
|
print(names) |
|
for i in fls: |
|
names.append(str(i.name)) |
|
|
|
|
|
pdf = [] |
|
docs = [] |
|
pptx = [] |
|
|
|
for i in names: |
|
|
|
if i[-3:] == "pdf": |
|
pdf.append(i) |
|
elif i[-4:] == "docx": |
|
docs.append(i) |
|
else: |
|
pptx.append(i) |
|
|
|
|
|
|
|
for idx, file in enumerate(pdf): |
|
print("11111") |
|
|
|
with pdfplumber.open(file) as pdf: |
|
for i in range(len(pdf.pages)): |
|
|
|
page = pdf.pages[i] |
|
res_list = page.extract_text().split('\n')[:-1] |
|
|
|
for j in range(len(page.images)): |
|
|
|
img = page.images[j] |
|
file_name = '{}-{}-{}.png'.format(str(time.time()), str(i), str(j)) |
|
with open(file_name, mode='wb') as f: |
|
f.write(img['stream'].get_data()) |
|
try: |
|
res = ocr.ocr(file_name) |
|
|
|
except Exception as e: |
|
res = [] |
|
if len(res) > 0: |
|
res_list.append(' '.join([re['text'] for re in res])) |
|
|
|
tables = page.extract_tables() |
|
for table in tables: |
|
|
|
df = pd.DataFrame(table[1:], columns=table[0]) |
|
try: |
|
records = json.loads(df.to_json(orient="records", force_ascii=False)) |
|
for rec in records: |
|
res_list.append(json.dumps(rec, ensure_ascii=False)) |
|
except Exception as e: |
|
res_list.append(str(df)) |
|
|
|
doc_text_list += res_list |
|
|
|
|
|
doc_text_list = [str(text).strip() for text in doc_text_list if len(str(text).strip()) > 0] |
|
|
|
return gr.Textbox.update(value='\n'.join(doc_text_list), visible=True), gr.Button.update( |
|
visible=True), gr.Markdown.update( |
|
value="Processing") |
|
|
|
|
|
|
|
def get_answer(query_live): |
|
|
|
llm = OpenAI(temperature=0, openai='aaa') |
|
qa_chain = load_qa_chain(llm,chain_type='stuff') |
|
query = query_live |
|
docs = docstore.similarity_search(query) |
|
qa_chain.run(input_documents = docs, question = query) |
|
|
|
with gr.Blocks() as demo: |
|
with gr.Row(): |
|
with gr.Column(): |
|
file = gr.File(file_types=['.pdf'], label='Click to upload Document', file_count='multiple') |
|
doc_bu = gr.Button(value='Submit', visible=False) |
|
|
|
|
|
txt = gr.Textbox(label='result', visible=False) |
|
|
|
|
|
doc_text_state = gr.State([]) |
|
doc_emb_state = gr.State([]) |
|
|
|
with gr.Column(): |
|
md = gr.Markdown("Please Upload the PDF") |
|
chat_bot = gr.Chatbot(visible=False) |
|
msg_txt = gr.Textbox(visible = False) |
|
chat_bu = gr.Button(value='Clear', visible=False) |
|
|
|
file.change(up_file, [file], [txt, doc_bu, md]) |
|
doc_bu.click(doc_emb, [txt], [doc_text_state, doc_emb_state, msg_txt, chat_bu, md, chat_bot]) |
|
msg_txt.submit(get_response, [msg_txt, chat_bot,doc_text_state, doc_emb_state], [chat_bot],queue=False) |
|
chat_bu.click(lambda: None, None, chat_bot, queue=False) |
|
|
|
if __name__ == "__main__": |
|
demo.queue().launch(show_api=False) |
|
|