|
"""## Import necessary libraries""" |
|
import os |
|
import shutil |
|
import json |
|
from langchain.document_loaders import PyPDFLoader |
|
from langchain.document_loaders import PyPDFDirectoryLoader |
|
from langchain.llms import OpenAI |
|
from langchain.prompts import PromptTemplate |
|
from langchain.chains import LLMChain |
|
from langchain.output_parsers import PydanticOutputParser |
|
from pydantic import BaseModel, Field |
|
from langchain.document_loaders import YoutubeLoader |
|
from langchain.document_loaders import WebBaseLoader |
|
from langchain.text_splitter import CharacterTextSplitter |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from langchain.embeddings.openai import OpenAIEmbeddings |
|
from langchain.vectorstores import Chroma |
|
from langchain.chat_models import ChatOpenAI |
|
from langchain.chains import RetrievalQA |
|
|
|
from google.oauth2 import service_account |
|
from google.cloud import translate_v2 as translate |
|
import gradio as gr |
|
|
|
"""## Access KEY""" |
|
|
|
service_account_info = json.loads(os.environ.get("SERVICE_ACCOUNT_FILE")) |
|
credentials = service_account.Credentials.from_service_account_info(service_account_info) |
|
|
|
""" ## Load PDF """ |
|
class LoadPdf: |
|
|
|
def __init__(self, pdf_file): |
|
if not self.is_pdf_file(pdf_file): |
|
raise gr.Error("Invalid file extension. Please load a PDF file") |
|
self.pdf_file = pdf_file |
|
|
|
def is_pdf_file(self, file_path): |
|
_, file_extension = os.path.splitext(file_path) |
|
return file_extension.lower() == ".pdf" |
|
|
|
def read_file(self): |
|
loader = PyPDFLoader(self.pdf_file) |
|
data = loader.load() |
|
return data |
|
|
|
"""## Request OpenAI""" |
|
class QuestionAnswer: |
|
|
|
def __init__(self, data, question, user_key): |
|
self.data = data |
|
self.question = question |
|
self.key = user_key |
|
|
|
def make_qa(self): |
|
|
|
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100) |
|
splits = text_splitter.split_documents(self.data) |
|
|
|
persist_directory = 'files/chroma/' |
|
|
|
embedding = OpenAIEmbeddings(openai_api_key=self.key) |
|
retriever = Chroma.from_documents(documents=splits, |
|
embedding=embedding, |
|
persist_directory=persist_directory).as_retriever() |
|
|
|
|
|
llm = ChatOpenAI(temperature=0.2, model="gpt-3.5-turbo-16k", openai_api_key=self.key) |
|
question_answer = RetrievalQA.from_chain_type(llm=llm, retriever=retriever) |
|
|
|
make_question = f'{self.question}' |
|
|
|
return question_answer.run(make_question) |
|
|
|
"""## Translation""" |
|
class TranslateOutput: |
|
|
|
def __init__(self, credentials): |
|
self.credentials = credentials |
|
|
|
def list_languages(self): |
|
client = translate.client.Client(credentials=self.credentials) |
|
languages = client.get_languages() |
|
language_names = [language['name'] for language in languages] |
|
return language_names |
|
|
|
def all_languages(self): |
|
client = translate.client.Client(credentials=self.credentials) |
|
languages = client.get_languages() |
|
return languages |
|
|
|
def translate_text(self, text, target_language): |
|
client = translate.client.Client(target_language=target_language, credentials=self.credentials) |
|
|
|
if isinstance(text, bytes): |
|
text = text.decode("utf-8") |
|
|
|
result = client.translate(text, target_language=target_language) |
|
return result["translatedText"] |
|
|
|
"""## Run QA """ |
|
def run_qa(files,checkboxes,question,language,user_key): |
|
|
|
|
|
if user_key is None: |
|
return 'Introduza OpenAI API KEY' |
|
|
|
full_filenames = [file.name for file in files] |
|
available_files = [os.path.basename(path) for path in full_filenames] |
|
chosen_files = checkboxes |
|
|
|
|
|
loadable_files = [file for file in available_files if file in chosen_files] |
|
|
|
|
|
print(f"=> Available Files: {str(available_files)}") |
|
print(f"=> Chosen Files: {str(chosen_files)}") |
|
print(f"=> Question for Files: {str(question)}") |
|
print(f"=> Language to use: {str(language)}") |
|
|
|
|
|
data='' |
|
|
|
for file in loadable_files: |
|
print(f"=> Loading chosen file: {str(file)}") |
|
pdf_loader = LoadPdf("pdfs/"+file) |
|
data = pdf_loader.read_file() |
|
|
|
|
|
qa = QuestionAnswer(data, question, user_key) |
|
answer_open_ai = qa.make_qa() |
|
|
|
|
|
language_selected = language |
|
translate_output = TranslateOutput(credentials) |
|
|
|
for i in translate_output.all_languages(): |
|
if i['name'] == language_selected: |
|
iso_code = i['language'] |
|
break |
|
|
|
print(f"=> Answer OpenAI: {answer_open_ai}") |
|
print(f"=> Target Language IsoCode: {iso_code}") |
|
|
|
answer = translate_output.translate_text(answer_open_ai, target_language=iso_code) |
|
print(f"=> Translated Answer OpenAI: {answer}") |
|
|
|
return answer |
|
|
|
|
|
def on_files_upload(files): |
|
|
|
if not os.path.exists("pdfs"): |
|
os.makedirs("pdfs", exist_ok=True) |
|
|
|
files_dir = "pdfs" |
|
for fileobj in files: |
|
path = files_dir + "/" + os.path.basename(fileobj) |
|
shutil.copyfile(fileobj.name, path) |
|
|
|
full_filenames = [file.name for file in files] |
|
filenames = [os.path.basename(path) for path in full_filenames] |
|
return(gr.CheckboxGroup(choices=filenames)) |
|
|
|
|
|
def on_files_cleared(): |
|
if os.path.exists("pdfs"): |
|
shutil.rmtree("pdfs") |
|
|
|
return(gr.CheckboxGroup(choices=[])) |
|
|
|
|
|
title = "Question/Answer over Documents" |
|
subtitle = "OpenAI GPT 3.5 Turbo LLM assisted Question/Answer over multiple PDF context documents" |
|
authors = "Hugo Cavalaria " |
|
custom_layout = "<h1>{}</h1><h2>{}</h2><p>{}</p>".format(title,subtitle,authors) |
|
|
|
|
|
translate_output = TranslateOutput(credentials) |
|
language_names = [i for i in translate_output.list_languages()] |
|
|
|
|
|
with gr.Blocks() as interface: |
|
with gr.Row(): |
|
with gr.Column(scale=2): |
|
gr.HTML(custom_layout) |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
upload_pdfs = gr.Files(label="Upload multiple PDF files.", interactive=True, file_types=['.pdf'], container=True) |
|
checkbox_group = gr.CheckboxGroup(label="Select the files to question.", choices=[], interactive=True) |
|
question_text = gr.Textbox(label="Question:") |
|
answer_language = gr.Dropdown(label="Answer translation to:", choices=language_names, value="Portuguese") |
|
secret_key = gr.Textbox(label="OpenAI API Key:") |
|
with gr.Column(scale=1): |
|
output_status = gr.Textbox(label="Answer:") |
|
|
|
btn = gr.Button("Ask") |
|
|
|
btn.click(fn=run_qa, |
|
inputs=[upload_pdfs,checkbox_group,question_text,answer_language,secret_key], |
|
outputs=[output_status]) |
|
|
|
upload_pdfs.upload(fn=on_files_upload, |
|
inputs=[upload_pdfs], |
|
outputs=[checkbox_group], |
|
show_progress="full") |
|
|
|
upload_pdfs.clear(fn=on_files_cleared, |
|
inputs=None, |
|
outputs=[checkbox_group]) |
|
|
|
"""## Launch Interface""" |
|
|
|
if __name__ == "__main__": |
|
interface.launch(share=False, debug=True) |