Spaces:
Sleeping
Sleeping
import time | |
import chromadb | |
from chromadb.utils import embedding_functions | |
from test.new import connect_to_llama | |
# from transformers import pipeline | |
import gradio as gr | |
import PyPDF2 | |
import os | |
from chunkipy.text_chunker import split_by_sentences | |
import langid | |
from translate import Translator | |
chroma_client = chromadb.PersistentClient() | |
from test.llama import llama_local | |
working_dir = os.getcwd() | |
# checkpoint = f"{working_dir}/LaMini-T5-738M" | |
# model = pipeline('text2text-generation', model=checkpoint) | |
# input_prompt = """Answer the following question related reasoning answers from the following contexts that is given ..Don't generate answer from your data generate only from the provided contexts | |
# ..If the contexts doesn't provide an answer or isn't related to the question, respond with "there is no answer for the provided question" | |
# Question:"{}", | |
# Contexts:"{}" | |
# Answer: | |
# """ | |
def detect_and_translate_query(query, context, dest_language='en'): | |
input_language, _ = langid.classify(query) | |
if isinstance(context, list): | |
context = " ".join(context) | |
translator = Translator(to_lang=dest_language, from_lang=input_language) | |
translated_query = translator.translate(query) | |
translated_context = translator.translate(context) | |
return translated_query, translated_context, input_language | |
def translate_response(response, source_language, dest_language): | |
translator = Translator(to_lang=source_language, from_lang=dest_language) | |
translated_response = translator.translate(response) | |
print("translate_response "+str(translate_response)) | |
return translated_response | |
def create_multiple_db(path,collection,working_dir): | |
filelist = os.listdir(path) | |
print(filelist) | |
data_pdfs = [] | |
metadata_buff=[] | |
for file_n in filelist: | |
with open(file_n, 'rb') as file: | |
pdf_reader = PyPDF2.PdfReader(file) | |
meta_data=dict(pdf_reader.metadata) | |
print("De elmeta data before: ",meta_data) | |
meta_data.update({"/Title":file_n}) | |
print("De elmeta data after: ", meta_data) | |
metadata_buff.append(meta_data) | |
data = "" | |
for page_num in range(len(pdf_reader.pages)): | |
data += pdf_reader.pages[page_num].extract_text() | |
chunk = split_by_sentences(data) | |
for i, chunks in enumerate(chunk): | |
print(f"chunks{i}:", chunks) | |
data_pdfs.append(chunk) | |
file.close() | |
os.chdir(working_dir) | |
print(metadata_buff,"\n",len(metadata_buff)) | |
sentence_transformer_ef = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="all-MiniLM-L6-v2") | |
i = 0 | |
md_i=0 | |
for data in data_pdfs: | |
print(data) | |
collection.add( | |
documents=data, | |
embeddings=sentence_transformer_ef(data), | |
ids=['id' + str(x + i) for x in range(len(data))], | |
metadatas=[metadata_buff[md_i]for i in range(len(data))] | |
) | |
md_i+=1 | |
i += len(data) | |
return "done" | |
def architecture_with_chroma(data): | |
try: | |
data_dict = eval(data) | |
except: | |
return "please enter a valid json (dict) to process" | |
id = data_dict.get('id') | |
if id is None: | |
return "please enter an id to process on the prompt" | |
id = "mate" + str(id) | |
query = data_dict.get('query') | |
if query is None or query == "": | |
return "please enter a query to process" | |
collection = chroma_client.get_or_create_collection(name=id) | |
results = collection.query( | |
query_texts=[query], | |
n_results=5 | |
) | |
context = results.get('documents')[0] | |
results_metadata = list(results.get("metadatas")[0]) | |
results_documents = list(results.get("documents")[0]) | |
for i in range(5): | |
results_documents[i] = f"In {results_metadata[i].get('/Title')}:" + results_documents[i] | |
for data in results_documents: | |
print(data) | |
print(context) | |
# generated_text = model(input_prompt.format(query+"? answer reasoning answers from the provided contexts only that is related and contains this information ", context), max_length=1024, do_sample=False)[0]['generated_text'] | |
# print(input_prompt) | |
chroma_client.stop() | |
translated_query, translated_context, input_language = detect_and_translate_query(query, context) | |
print('translated_query '+str(translated_query)) | |
print('translated_context '+str(translated_context)) | |
results=connect_to_llama(query,results_documents) | |
# results=llama_local(query,results_documents) | |
translated_response = translate_response(results, input_language, dest_language='en') | |
return translated_response | |
# return results | |
# return generated_text | |
def create(data): | |
print(data) | |
print(type(data)) | |
try: | |
dict=eval(data) | |
except: | |
return "please enter a valid json (dict) to process" | |
id=dict.get('id') | |
if id==None : | |
return "please enter an id to process on the prompt" | |
id="mate"+str(id) | |
if(not os.path.exists(id)): | |
return "sorry ,there is no directory for this client" | |
else: | |
chroma_client.delete_collection(name=id) | |
collection = chroma_client.get_or_create_collection(name=id) | |
print(os.chdir(id)) | |
return create_multiple_db(os.getcwd(),collection,working_dir)+" making data for client" | |
def update(data): | |
print(data) | |
print(type(data)) | |
try: | |
dict=eval(data) | |
except: | |
return "please enter a valid json (dict) to process" | |
id=dict.get('id') | |
if id==None : | |
return "please enter an id to process on the prompt" | |
id="mate"+str(dict.get('id')) | |
if(not os.path.exists(id)): | |
return "sorry ,there is no directory for this client" | |
else: | |
chroma_client.delete_collection(name=id) | |
collection=chroma_client.create_collection(name=id) | |
print(os.chdir(id)) | |
return create_multiple_db(os.getcwd(),collection,working_dir)+"updating client embeddings" | |
iface = gr.Blocks() | |
with iface: | |
name = gr.Textbox(label="Name") | |
output = gr.Textbox(label="Output Box") | |
process_btn = gr.Button("process") | |
process_btn.click(fn=architecture_with_chroma, inputs=name, outputs=output, api_name="process") | |
create_btn = gr.Button("create") | |
create_btn.click(fn=create, inputs=name, outputs=output, api_name="create") | |
update_btn = gr.Button("update") | |
update_btn.click(fn=update, inputs=name, outputs=output, api_name="update") | |
iface.launch() | |