|
import urllib.request |
|
import fitz |
|
import re |
|
import numpy as np |
|
import tensorflow_hub as hub |
|
import gradio as gr |
|
import os |
|
from sklearn.neighbors import NearestNeighbors |
|
import requests |
|
import tensorflow_text |
|
|
|
api_url="https://free.churchless.tech/v1/chat/completions" |
|
|
|
def download_pdf(url, output_path): |
|
urllib.request.urlretrieve(url, output_path) |
|
|
|
|
|
def preprocess(text): |
|
text = text.replace('\n', ' ') |
|
text = re.sub('\s+', ' ', text) |
|
return text |
|
|
|
|
|
def pdf_to_text(path, start_page=1, end_page=None): |
|
doc = fitz.open(path) |
|
total_pages = doc.page_count |
|
|
|
if end_page is None: |
|
end_page = total_pages |
|
|
|
text_list = [] |
|
|
|
for i in range(start_page-1, end_page): |
|
text = doc.load_page(i).get_text("text") |
|
text = preprocess(text) |
|
text_list.append(text) |
|
|
|
doc.close() |
|
return text_list |
|
|
|
|
|
def text_to_chunks(texts, word_length=150, start_page=1): |
|
text_toks = [t.split(' ') for t in texts] |
|
page_nums = [] |
|
chunks = [] |
|
|
|
for idx, words in enumerate(text_toks): |
|
for i in range(0, len(words), word_length): |
|
chunk = words[i:i+word_length] |
|
if (i+word_length) > len(words) and (len(chunk) < word_length) and ( |
|
len(text_toks) != (idx+1)): |
|
text_toks[idx+1] = chunk + text_toks[idx+1] |
|
continue |
|
chunk = ' '.join(chunk).strip() |
|
|
|
chunks.append(chunk) |
|
return chunks |
|
|
|
class SemanticSearch: |
|
|
|
def __init__(self): |
|
|
|
self.use = hub.load('https://tfhub.dev/google/universal-sentence-encoder-multilingual-large/3') |
|
|
|
self.fitted = False |
|
|
|
|
|
def fit(self, data, batch=1000, n_neighbors=2): |
|
self.data = data |
|
self.embeddings = self.get_text_embedding(data, batch=batch) |
|
n_neighbors = min(n_neighbors, len(self.embeddings)) |
|
self.nn = NearestNeighbors(n_neighbors=n_neighbors) |
|
self.nn.fit(self.embeddings) |
|
self.fitted = True |
|
|
|
|
|
def __call__(self, text, return_data=True): |
|
inp_emb = self.use([text]) |
|
neighbors = self.nn.kneighbors(inp_emb, return_distance=False)[0] |
|
|
|
if return_data: |
|
return [self.data[i] for i in neighbors] |
|
else: |
|
return neighbors |
|
|
|
|
|
def get_text_embedding(self, texts, batch=1000): |
|
embeddings = [] |
|
for i in range(0, len(texts), batch): |
|
text_batch = texts[i:(i+batch)] |
|
emb_batch = self.use(text_batch) |
|
embeddings.append(emb_batch) |
|
embeddings = np.vstack(embeddings) |
|
return embeddings |
|
|
|
|
|
|
|
def load_recommender(path, start_page=1): |
|
global recommender |
|
texts = pdf_to_text(path, start_page=start_page) |
|
chunks = text_to_chunks(texts, start_page=start_page) |
|
recommender.fit(chunks) |
|
return 'Corpus Loaded.' |
|
|
|
def generate_text(prompt): |
|
|
|
data = { |
|
"frequency_penalty": 0, |
|
"model": "gpt-3.5-turbo", |
|
"presence_penalty": 0, |
|
"temperature":1, |
|
"top_p": 1, |
|
"messages":[{"role":"system","content":"You are ChatGPT, a large language model trained by OpenAI.\nCarefully heed the user's instructions. \nRespond using Markdown."},{"role":"user", "content": prompt}] |
|
} |
|
|
|
|
|
r = requests.post(api_url, json = data) |
|
completions = r.json() |
|
message = completions.get("choices")[0].get("message").get("content") |
|
print(message) |
|
return message |
|
|
|
def generate_answer(question): |
|
topn_chunks = recommender(question) |
|
prompt = "" |
|
prompt += 'Результаты поиска:\n\n' |
|
for c in topn_chunks: |
|
prompt += c + '\n\n' |
|
|
|
prompt += ( |
|
"Инструкция: Составь исчерпывающий ответ на вопрос, используя приведенные результаты поиска. " |
|
"Включай только информацию, найденную в результатах поиска, и " |
|
"не добавляй никакой дополнительной информации. Убедись, что ответ правильный, и не выводи ложный контент. " |
|
"Если текст не относится к воросу, просто укажи 'Информация не найдена в этом документе'. Игнорируй " |
|
"результаты поиска, которые не имеют никакого отношения к вопросу. Отвечайте только на то, о чем тебя спрашивают. " |
|
"Ответ должен быть коротким и лаконичным. Отвечай шаг за шагом.\n\n" |
|
) |
|
|
|
prompt += f"Запрос: {question}\nОтвет:" |
|
print('prompt','->', prompt) |
|
answer = generate_text(prompt) |
|
return answer |
|
|
|
class Empty(): |
|
pass |
|
|
|
def question_answer(file, question): |
|
if file == None: |
|
load_recommender('/app/corpus.pdf') |
|
else: |
|
old_file_name = file.name |
|
file_name = file.name |
|
file_name = file_name[:-12] + file_name[-4:] |
|
os.rename(old_file_name, file_name) |
|
load_recommender(file_name) |
|
|
|
if question.strip() == '': |
|
return '[ERROR]: Question field is empty' |
|
|
|
return generate_answer(question) |
|
|
|
|
|
recommender = SemanticSearch() |
|
|
|
title = 'PDF GPT' |
|
description = """ PDF GPT позволяет вам общаться в чате с вашим PDF-файлом, используя Universal Sentence Encoder и Open AI""" |
|
|
|
with gr.Blocks() as demo: |
|
|
|
gr.Markdown(f'<center><h1>{title}</h1></center>') |
|
gr.Markdown(description) |
|
|
|
with gr.Row(): |
|
|
|
with gr.Group(): |
|
gr.Markdown("<center><h4><h4></center>") |
|
file = gr.File(label='Загрузите ваш PDF', file_types=['.pdf']) |
|
gr.Markdown("<center><h4><h4></center>") |
|
question = gr.Textbox(label='Введите ваш вопрос') |
|
btn = gr.Button(value='Submit') |
|
btn.style(full_width=True) |
|
|
|
with gr.Group(): |
|
answer = gr.Textbox(label='Ответ на ваш вопрос :') |
|
|
|
btn.click(question_answer, inputs=[file, question], outputs=[answer]) |
|
demo.launch() |