Spaces:
Runtime error
Runtime error
import gradio as gr | |
import datetime | |
import random | |
import time | |
import os | |
import shutil | |
import pandas as pd | |
# How to RUN code ==> gradio gradio_llm_example.py | |
# from langchain import HuggingFacePipeline | |
#def load_llm_model(model: str = "google/flan-t5-large") -> HuggingFacePipeline: | |
# llm = HuggingFacePipeline.from_model_id( | |
# model_id=model, | |
# task="text2text-generation", | |
# model_kwargs={"max_length": 1500, "load_in_8bit": True}, | |
#) | |
# Define text and title information | |
title1 = "## π€ About QA App" | |
title2 = " ## π¬ Chat with QA Bot" | |
title3 = " ## π§ Toolbox " | |
title4 = " ## βοΈ Parameters" | |
title5 = " ## π Evaluation" | |
intro = """ Welcome! This is not just any bot, it's a special one equipped with state-of-the-art natural language processing capabilities, and ready to answer your queries with/without the support of some additional documents. | |
**Ready to explore? Let's get started!** ππ | |
* Chat with the bot by typing any question you want | |
and get your answers ! | |
* You can load and select one or more documents to reinforce the bot's knowledge. | |
Don't forget to validate and update your selection according to your choices. | |
* You can customize your model by selecting advanced options in the toolbox.""" | |
final_info = """ Made with β€οΈ by us π""" | |
# Define theme ==> see gr.themes.builder() | |
theme = gr.themes.Soft( | |
primary_hue="emerald", | |
neutral_hue="slate", | |
text_size=gr.themes.sizes.text_md, | |
).set( | |
body_text_color='*secondary_900', | |
body_text_size='*text_lg', | |
body_text_weight='500', | |
border_color_accent='*secondary_950', | |
link_text_color='*secondary_300', | |
block_border_color='*neutral_200', | |
block_border_width='*block_label_border_width', | |
block_label_background_fill='*primary_200', | |
block_title_text_color='*primary_350', | |
checkbox_border_color='*primary_300', | |
checkbox_border_color_selected_dark='*secondary_200', | |
button_primary_border_color='*primary_350' | |
) | |
def upload_file(files_obj): | |
""" Upload several files from drag and drop, and save them in local temp folder | |
files_obj (type:list) : list of tempfile._TemporaryFileWrapper | |
return checkbox to display uploaded documents """ | |
temp_file_path = "./temp" | |
# Create local copy | |
if not os.path.exists(temp_file_path): | |
os.makedirs(temp_file_path) | |
# Save each file among list of given files | |
file_name_list = list() | |
for file_obj in files_obj : | |
file_name = os.path.basename(file_obj.name) | |
file_name_list.append(file_name) | |
shutil.copyfile(file_obj.name, os.path.join(temp_file_path, file_name)) | |
# return visible button for next selection | |
return {uploaded_check : gr.CheckboxGroup(choices=file_name_list, visible=True), | |
choose_btn : gr.Button(visible=True), | |
clear_folder_btn : gr.Button(visible=True)} | |
def read_PDFcontent(content, files_name): | |
""" Read and update the content variable (state) according to the several files_names to read from temp folder | |
return updated content_var (type : list of str) | |
return visible error_box to display logs error """ | |
content_list = list() | |
text_list = list() | |
# Parse one or several docs among the selected ones | |
temp_file_path = "./temp" | |
for file_name in files_name : | |
file_path = os.path.join(temp_file_path, file_name) | |
# Read doc | |
with open(file_path, "rb") as file: | |
try: | |
content = file.read() | |
#### YOUR FONCTION FOR CONTENT ==> must be str | |
my_content = str(content[:10]) | |
content_list.append(my_content) | |
text_list.append(f" {file_name} : ready β \n ") | |
# print(content) | |
except Exception as e: | |
print(f"Error occurred while writing the file: {e}") | |
text_list.append(f" {file_name} : error β \n") | |
return {content_var : content_list, | |
error_box : gr.Textbox(value=f"""{"".join(text_list)} """, visible=True)} | |
def clear_folder(): | |
temp_file_path = "./temp" | |
shutil.rmtree(temp_file_path) | |
return {uploaded_check : gr.CheckboxGroup(choices=[], visible=False), | |
error_box : gr.Textbox("", visible=False), | |
choose_btn : gr.Button(visible=False), | |
clear_folder_btn : gr.Button(visible=False), | |
context_box : gr.Textbox(""), | |
content_var : []} | |
# def write_content (chat_history, download_counter): | |
# temp_file_path = "./Download_chat" | |
# if not os.path.exists(temp_file_path): | |
# os.makedirs(temp_file_path) | |
# file_name = str(download_counter)+"chat_Conversation.txt" | |
# file_path = os.path.join(temp_file_path, file_name) | |
# # write doc | |
# with open(file_path, "w") as file: | |
# for query_answer in chat_history : | |
# file.write(" \n ".join(query_answer)) | |
# file.write(" \n ") | |
# new_count = int(download_counter)+1 | |
# return { download_counter : gr.Number(new_count, visible=False)} | |
### YOUR model using the same inputand returning output | |
def my_model(message, chat_history, content_var, | |
language_choice, model_choice, max_length, temperature, | |
num_return_sequences, top_p, no_repeat_ngram_size): | |
#No LLM here, just respond with a random pre-made message | |
if content_var == []: | |
bot_message = f"Pas de contexte : {content_var}" + "I'm not interested" | |
else: | |
bot_message = f" Voici le contexte : {content_var}" | |
chat_history.append((message, bot_message)) | |
return "", chat_history, {context_box : gr.Textbox(visible=True, value=f'{"and".join(content_var)}')} | |
def evaluate_my_model(answer): | |
# true_answer.update("") | |
return {true_answer : gr.Textbox.update("") , | |
updated_check : gr.Button(f"Model updated β at : {datetime.datetime.now().strftime('%H:%M:%S')} " , | |
visible = True)} | |
def display_my_metrics(metric_csv="./metrics.csv"): | |
df = pd.read_csv(metric_csv) | |
# df_metrics[0]=["Model", "P", "R", "F1", "β³"] | |
return {df_metrics : gr.DataFrame(df, visible=True)} | |
def clear_all(): | |
return {msg: gr.Textbox("") , | |
chatbot: gr.Chatbot(""), | |
context_box: gr.Textbox("") } | |
def queue_bot(history): | |
""" For in progress display during chat """ | |
bot_message = history[-1][1] | |
history[-1][1] = "" | |
for character in bot_message: | |
history[-1][1] += character | |
time.sleep(0.005) | |
yield history | |
# Params | |
temp_file_path = "./temp" | |
# App | |
with gr.Blocks(theme=theme) as gradioApp: | |
# Initialize the document context variable as empty without any drag and drop | |
content_var = gr.State([]) | |
download_counter = gr.Number(0, visible=False) | |
# Layout | |
gr.Markdown(""" <img src="file/logo_neovision.png" alt="logo" style="width:350px;"/>""") | |
with gr.Column(): | |
gr.Markdown(title1) | |
gr.Markdown(intro) | |
# gr.Markdown(final_info) | |
# Row 1 : Intro + Param | |
with gr.Row(equal_height=True): | |
# with gr.Column(min_width=80, scale = 0): | |
# gr.Markdown(" ") | |
# Row 2 : Chat | |
with gr.Column(min_width= 300, scale = 3): | |
title2_gr = gr.Markdown(title2) | |
chatbot = gr.Chatbot(label="Bot", height=300) | |
msg = gr.Textbox(label="User", placeholder="Ask any question.") | |
with gr.Row(): | |
with gr.Column(): | |
# clear = gr.ClearButton(components=[msg, chatbot, context_box], value="Clear console") | |
clear = gr.Button( value="ποΈ Clear console") | |
download = gr.Button( value="π© Download chat") | |
updated_download = gr.Button(f"Last download β at : {datetime.datetime.now().strftime('%H:%M:%S')} " , | |
visible = False) | |
with gr.Column(): | |
upload_button = gr.UploadButton("π Browse files", label="Drag and drop your documents here", | |
file_types=["pdf"], file_count="multiple") | |
uploaded_check = gr.CheckboxGroup(label=" π Uploaded documents", visible=False, | |
info="Do you want to use a supporting document?") | |
with gr.Row(): | |
choose_btn = gr.Button(value="π±οΈ Choose docs", visible=False) | |
clear_folder_btn = gr.Button(value="ποΈ Clear docs", visible=False) | |
# upload_iface = gr.Interface(fn=upload_file, | |
# inputs=gr.File(file_count="multiple", file_types=["pdf"]), | |
# outputs=[uploaded_check], | |
# description="π Browse files", allow_flagging="never") | |
error_box = gr.Textbox(label="Files state... ", visible=False) # display only when ready or error | |
# Row 3 : Toolbox | |
with gr.Column(min_width= 100, scale = 1): | |
gr.Markdown(title4) | |
with gr.Accordion(label="Select advanced options",open=False): | |
model_choice = gr.Dropdown(["LLM", "Other"], label="Model", info="Choose your AI model") | |
language_choice = gr.Dropdown(["English", "French"], label="Language", info="Choose your language") | |
max_length = gr.Slider(label="Token length", minimum=1, maximum=100, value=50, step=1) | |
temperature= gr.Slider(label="Temperature", minimum=0.1, maximum=1, value=0.8, step=0.5) | |
num_return_sequences= gr.Slider(label="Return Sequence", minimum=0.1, maximum=50, value=1, step=0.5) | |
top_p= gr.Slider(label="top p", minimum=0.1, maximum=1, value=0.8, step=0.5) | |
no_repeat_ngram_size= gr.Slider(label="repeat", minimum=0.1, maximum=1, value=3, step=0.5) | |
# Evaluate | |
with gr.Column(min_width= 100, scale = 1): | |
gr.Markdown(title5) | |
with gr.Accordion(label=" Evaluate the model",open=True): | |
context_box = gr.Textbox(label= "π Context", | |
placeholder=" The used context" ) | |
true_answer = gr.Textbox(lines = 2, label= "π Right Answer", | |
placeholder="Give an example of right answer and update the model") | |
update_btn = gr.Button(value = "Update β οΈ π" ) | |
updated_check = gr.Button(value= "", visible = False) | |
df_metrics = gr.DataFrame(row_count=(5, "fixed"), col_count=(3, "fixed"), visible=False) | |
###### Chatbot ###### | |
# Flag | |
logger = gr.CSVLogger() | |
logger.setup([chatbot], "Download_flagged") | |
# hf_writer = gr.HuggingFaceDatasetSaver(HF_TOKEN, "crowdsourced-calculator-demo") | |
download.click(lambda *args: logger.flag(args), [chatbot], None, preprocess=False) | |
download.click(lambda : {updated_download:gr.Button(visible=True)}, None, updated_download, preprocess=False) | |
# YOUR MODEL TO ADAPT | |
msg.submit(my_model, inputs=[msg, chatbot, content_var, | |
language_choice, model_choice, max_length, temperature, | |
num_return_sequences, top_p, no_repeat_ngram_size], | |
outputs=[msg, chatbot, context_box]).then(queue_bot, chatbot, chatbot) | |
# Chatbot clear | |
clear.click(clear_all, inputs=[] , outputs=[msg, chatbot, context_box]) | |
# Chatbot examples | |
example = gr.Examples(examples = [ ["What are the payment terms?"], | |
["Do I become the owner of the developments made?"], | |
[" Can Neovision use a subcontractor and if so, under what conditions?"], | |
["What are the termination conditions?"]], | |
fn = my_model, inputs=[msg, chatbot, content_var, | |
language_choice, model_choice, max_length, temperature, | |
num_return_sequences, top_p, no_repeat_ngram_size], | |
outputs=[msg, chatbot, context_box]) | |
###### DOCS ###### | |
# uploading one or several docs and display other buttons | |
upload_button.upload(upload_file, [upload_button], [uploaded_check, choose_btn, clear_folder_btn]) | |
# Read only one document | |
choose_btn.click(read_PDFcontent, inputs=[content_var, uploaded_check], outputs=[content_var, error_box]) | |
# # clear | |
clear_folder_btn.click(clear_folder, inputs=[] , | |
outputs=[uploaded_check, error_box, choose_btn, clear_folder_btn, | |
context_box, content_var]) | |
# evaluate and update model | |
update_btn.click(evaluate_my_model, inputs=true_answer, outputs=[true_answer, updated_check]) | |
update_btn.click(display_my_metrics, inputs=None, outputs=df_metrics) | |
gr.close_all() | |
gradioApp.queue() | |
gradioApp.launch(share=True) | |
#auth=("neovision", "gradio2023") to be placed inside the launch parameters |