""" Credit to Derek Thomas, derek@huggingface.co """ import subprocess subprocess.run(["pip", "install", "--upgrade", "transformers[torch,sentencepiece]==4.34.1"]) import logging from pathlib import Path from time import perf_counter import gradio as gr from jinja2 import Environment, FileSystemLoader from backend.query_llm import generate_hf, generate_openai from backend.semantic_search import tables, retrievers, trim, rerank_documents VECTOR_COLUMN_NAME = "embedding" TEXT_COLUMN_NAME = "text" proj_dir = Path(__file__).parent # Setting up the logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Set up the template environment with the templates directory env = Environment(loader=FileSystemLoader(proj_dir / 'templates')) # Load the templates directly from the environment template = env.get_template('template.j2') template_html = env.get_template('template_html.j2') # Examples examples = ['What is the capital of China?', 'Why is the sky blue?', 'Who won the mens world cup in 2014?', ] def add_text(history, text): history = [] if history is None else history history.append((text, None)) return history, gr.Textbox(value="", interactive=False) def api_call(history, api_kind, table_name, openai_key, rerank): last = None for output in bot(history, api_kind, table_name, openai_key, rerank): last = output return str(last[0][0][1])[:60000] def bot(history, api_kind, table_name, openai_key, rerank): top_k_rank = 4 query = history[-1][0] if not query: gr.Warning("Please submit a non-empty string as a prompt") raise ValueError("Empty string was submitted") if table_name not in tables: gr.Warning(f"Table name {table_name} is incorrect") raise ValueError(f"Table name {table_name} is incorrect") logger.warning('Retrieving documents...') logger.warning(f"{openai_key}") # Retrieve documents relevant to query document_start = perf_counter() retriever_name = table_name.split('_')[1] query_vec = retrievers[retriever_name](query, openai_key) documents = [] if rerank: # Search for 2x the documents and then rerank documents = tables[table_name].search(query_vec, vector_column_name=VECTOR_COLUMN_NAME).limit(top_k_rank * 2).to_list() documents = [doc[TEXT_COLUMN_NAME] for doc in documents] documents = rerank_documents(query, documents)[:top_k_rank] else: documents = tables[table_name].search(query_vec, vector_column_name=VECTOR_COLUMN_NAME).limit(top_k_rank).to_list() documents = [doc[TEXT_COLUMN_NAME] for doc in documents] document_time = perf_counter() - document_start logger.warning(f'Finished Retrieving documents in {round(document_time, 2)} seconds...') if api_kind == "HuggingFace": generate_fn = generate_hf elif api_kind == "OpenAI": max_length = 3000 generate_fn = lambda prompt, history: generate_openai(prompt, history, key = openai_key) # Trim the documents to fit into the context length documents = [trim(d, max_length // len(documents)) for d in documents] elif api_kind is None: gr.Warning("API name was not provided") raise ValueError("API name was not provided") else: gr.Warning(f"API {api_kind} is not supported") raise ValueError(f"API {api_kind} is not supported") # Create Prompt prompt = template.render(documents=documents, query=query) prompt_html = template_html.render(documents=documents, query=query) history[-1][1] = "" for character in generate_fn(prompt, history[:-1]): history[-1][1] = character yield history, prompt_html with gr.Blocks() as demo: chatbot = gr.Chatbot( [], elem_id="chatbot", avatar_images=('https://aui.atlassian.com/aui/8.8/docs/images/avatar-person.svg', 'https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.svg'), bubble_full_width=False, show_copy_button=True, show_share_button=True, ) with gr.Row(): txt = gr.Textbox( scale=3, show_label=False, placeholder="Enter text and press enter", container=False, ) txt_btn = gr.Button(value="Submit text", scale=1) api_kind = gr.Radio(choices=["HuggingFace", "OpenAI"], value="HuggingFace") table_name = gr.Radio(choices = list(sorted(tables.keys())), value = 'files_MiniLM') rerank = gr.Checkbox(value = False, label="Rerank using cross-encoders") openai_key = gr.Textbox(max_lines=1, value = 'Your API key here', label="OpenAI API key") prompt_html = gr.HTML() # Turn off interactivity while generating if you click txt_msg = txt_btn.click(add_text, [chatbot, txt], [chatbot, txt], queue=False).then( bot, [chatbot, api_kind, table_name, openai_key, rerank], [chatbot, prompt_html]) # Turn it back on txt_msg.then(lambda: gr.Textbox(interactive=True), None, [txt], queue=False) # Turn off interactivity while generating if you hit enter txt_msg = txt.submit(add_text, [chatbot, txt], [chatbot, txt], queue=False).then( bot, [chatbot, api_kind, table_name, openai_key, rerank], [chatbot, prompt_html]) # Turn it back on txt_msg.then(lambda: gr.Textbox(interactive=True), None, [txt], queue=False) # Examples gr.Examples(examples, txt) hidden_txt = gr.Textbox(visible=False) hidden = gr.Button(value="Ignore", visible=False) hidden.click(api_call, [chatbot, api_kind, table_name, openai_key, rerank], [hidden_txt]) demo.queue() demo.launch(debug=True)