|
import gradio as gr |
|
import fitz |
|
import re |
|
import numpy as np |
|
from sentence_transformers import SentenceTransformer |
|
import faiss |
|
from langdetect import detect |
|
from translate import Translator |
|
import openai |
|
import os |
|
import urllib.request |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
global chunks |
|
|
|
def download_pdf(url, output_path): |
|
urllib.request.urlretrieve(url, output_path) |
|
|
|
|
|
|
|
|
|
def preprocess(text): |
|
text = text.replace('\n', ' ') |
|
text = re.sub('\s+', ' ', text) |
|
return text |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def pdf_to_text(path, start_page=1, end_page=None): |
|
doc = fitz.open(path) |
|
total_pages = doc.page_count |
|
|
|
if end_page is None or end_page > total_pages: |
|
end_page = total_pages |
|
|
|
text_list = [] |
|
|
|
for i in tqdm(range(start_page-1, end_page), desc="Extracting text from PDF"): |
|
text = doc.load_page(i).get_text("text") |
|
text = preprocess(text) |
|
text_list.append(text) |
|
|
|
doc.close() |
|
return text_list |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def text_to_chunks(texts, word_length=150, start_page=1): |
|
buffer = [] |
|
|
|
for idx, text in enumerate(texts): |
|
words = text.split(' ') |
|
for word in words: |
|
buffer.append(word) |
|
if len(buffer) >= word_length: |
|
chunk = ' '.join(buffer).strip() |
|
chunks.append(f'Page {idx+start_page}: "{chunk}"') |
|
buffer = [] |
|
|
|
|
|
if len(buffer) >= word_length: |
|
chunk = ' '.join(buffer).strip() |
|
chunks.append(f'Page {idx+start_page}: "{chunk}"') |
|
buffer = [] |
|
|
|
return chunks |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
model = SentenceTransformer('all-MiniLM-L6-v2') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def search(query, k=5): |
|
|
|
original_language = detect_lang(query) |
|
query_in_english = translate_to_english(query) if original_language != 'en' else query |
|
|
|
query_embedding = model.encode([query_in_english])[0].astype(np.float32) |
|
distances, indices = index.search(np.array([query_embedding]), 5) |
|
|
|
relevant_chunks = [chunks[idx] for idx in indices[0]] |
|
return relevant_chunks, original_language |
|
|
|
|
|
|
|
|
|
|
|
|
|
def safe_translate(text, from_lang, to_lang, max_length=500): |
|
translator = Translator(to_lang=to_lang, from_lang=from_lang) |
|
|
|
words = text.split() |
|
segments = [] |
|
current_segment = [] |
|
current_length = 0 |
|
|
|
for word in words: |
|
if current_length + len(word) + 1 > max_length: |
|
segments.append(" ".join(current_segment)) |
|
current_segment = [word] |
|
current_length = len(word) |
|
else: |
|
current_segment.append(word) |
|
current_length += len(word) + 1 |
|
|
|
|
|
if current_segment: |
|
segments.append(" ".join(current_segment)) |
|
|
|
|
|
translated_segments = [translator.translate(segment) for segment in segments] |
|
|
|
|
|
translated_text = " ".join(translated_segments) |
|
return translated_text |
|
|
|
def detect_lang(text): |
|
return detect(text) |
|
|
|
def translate_to_english(text, max_length=500): |
|
detected_language = detect_lang(text) |
|
if detected_language != 'en': |
|
return safe_translate(text, from_lang=detected_language, to_lang='en', max_length=max_length) |
|
return text |
|
|
|
def translate_from_english(text, target_lang, max_length=500): |
|
if target_lang != 'en': |
|
return safe_translate(text, from_lang='en', to_lang=target_lang, max_length=max_length) |
|
return text |
|
|
|
|
|
openai.api_key = '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' |
|
|
|
|
|
|
|
def generate_response_from_chunks(user_query, max_tokens=325): |
|
relevant_chunks, original_language = search(user_query) |
|
|
|
if original_language != 'en': |
|
translated_query_to_english = translate_to_english(user_query) |
|
else: |
|
translated_query_to_english = user_query |
|
|
|
|
|
|
|
prompt = "search results:\n\n" + "".join([f"{i+1}. {chunk}\n\n" for i, chunk in enumerate(relevant_chunks)]) |
|
prompt += "Instructions: Compose a comprehensive and succinct reply to the query using the search results given. " \ |
|
"Cite each reference using [Page #number] notation (every result has a number at the beginning). " \ |
|
"Citation should be done at the end of each sentence. If the search results mention multiple subjects " \ |
|
"with the same name, create separate answers for each. Only include information found in the results and " \ |
|
"don't add any additional information. Make sure the answer is correct and don't output false content. You should also mention where a given answer might be found in the text if appropriate. Keep answers under around seven sentences." \ |
|
"If the text does not relate to the query, simply state 'Sorry, Lil' Dewey found nothing relevant in the text.'. Don't write 'Answer:' " \ |
|
"Directly start and state the answer.\n" |
|
|
|
prompt += f"Query: {translated_query_to_english}\n\n" |
|
|
|
|
|
response = client.chat.completions.create(model="gpt-4", |
|
messages=[ |
|
{"role": "system", "content": prompt}, |
|
{"role": "user", "content": "Please provide a response based on the above instructions."} |
|
], |
|
temperature=0.7, |
|
max_tokens=max_tokens, |
|
top_p=1.0, |
|
frequency_penalty=0.0, |
|
presence_penalty=0.0) |
|
|
|
|
|
|
|
generated_text = response.choices[0].message.content.strip() |
|
|
|
translated_response = translate_from_english(generated_text, original_language) if original_language != 'en' else generated_text |
|
|
|
return translated_response |
|
|
|
def question_answer(url, file, question): |
|
if url.strip() == '' and file is None: |
|
return '[ERROR]: Both URL and PDF are empty. Provide at least one.' |
|
|
|
if url.strip() != '' and file is not None: |
|
return '[ERROR]: Both URL and PDF are provided. Please provide only one (either URL or PDF).' |
|
|
|
file_path = 'temp_document.pdf' |
|
|
|
if url.strip() != '': |
|
download_pdf(url, file_path) |
|
else: |
|
|
|
with open(file_path, 'wb') as f: |
|
f.write(file.getbuffer()) |
|
|
|
if question.strip() == '': |
|
return '[ERROR]: Question field is empty.' |
|
|
|
|
|
texts = pdf_to_text(file_path) |
|
chunks = text_to_chunks(texts) |
|
|
|
|
|
|
|
chunks = chunks |
|
|
|
|
|
embeddings = model.encode(chunks, show_progress_bar=True) |
|
|
|
|
|
dimension = embeddings.shape[1] |
|
global index |
|
index = faiss.IndexFlatL2(dimension) |
|
index.add(embeddings.astype(np.float32)) |
|
|
|
|
|
response = generate_response_from_chunks(question) |
|
return response |
|
|
|
|
|
title = 'PDF Chatbot with Translation Features' |
|
description = """This tool allows you to upload a PDF document or provide a URL to one, ask questions about its contents, and receive answers. It incorporates translation features, making it possible to ask questions in any language and receive responses in that language.""" |
|
|
|
with gr.Blocks() as demo: |
|
|
|
gr.Markdown(f'<center><h1>{title}</h1></center>') |
|
gr.Markdown(description) |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
url = gr.Textbox(label='URL') |
|
gr.Markdown("<center>or</center>", elem_id="markdown_or") |
|
file = gr.File(label='PDF', file_types=['pdf']) |
|
question = gr.Textbox(label='Question') |
|
submit_btn = gr.Button(value='Submit') |
|
|
|
with gr.Column(scale=1): |
|
answer = gr.Textbox(label='Answer') |
|
|
|
submit_btn.click(fn=question_answer, inputs=[url, file, question], outputs=[answer]) |
|
|
|
|
|
demo.launch() |