""" Credit to Derek Thomas, derek@huggingface.co """ import subprocess # subprocess.run(["pip", "install", "--upgrade", "transformers[torch,sentencepiece]==4.34.1"]) import logging from pathlib import Path from time import perf_counter import gradio as gr from jinja2 import Environment, FileSystemLoader import numpy as np from sentence_transformers import CrossEncoder from backend.query_llm import generate_hf, generate_openai from backend.semantic_search import table, retriever VECTOR_COLUMN_NAME = "vector" TEXT_COLUMN_NAME = "text" proj_dir = Path(__file__).parent # Setting up the logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Set up the template environment with the templates directory env = Environment(loader=FileSystemLoader(proj_dir / 'templates')) # Load the templates directly from the environment template = env.get_template('template.j2') template_html = env.get_template('template_html.j2') # crossEncoder cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2') #cross_encoder = CrossEncoder('BAAI/bge-reranker-base') # Examples examples = ['My transhipment cargo is missing', 'What are benefits of the AEO Scheme and eligibility criteria?', 'What are penalties for customs offences? ', ] def add_text(history, text): history = [] if history is None else history history = history + [(text, None)] return history, gr.Textbox(value="", interactive=False) def bot(history, api_kind): top_rerank = 15 top_k_rank = 5 query = history[-1][0] print('history[-1][0]',history[-1][0]) if not query: gr.Warning("Please submit a non-empty string as a prompt") raise ValueError("Empty string was submitted") logger.warning('Retrieving documents...') # Retrieve documents relevant to query document_start = perf_counter() query_vec = retriever.encode(query) print(query) query_vec_flat = [arr.flatten() for arr in query_vec] logger.warning(f'Finished query vec') #documents = table.search(query_vec_flat, vector_column_name=VECTOR_COLUMN_NAME).limit(top_k_rank).to_list() logger.warning(f'Finished search') documents = table.search(query_vec, vector_column_name=VECTOR_COLUMN_NAME).limit(top_rerank).to_list() documents = [doc[TEXT_COLUMN_NAME] for doc in documents] logger.warning(f'start cross encoder {len(documents)}') # Retrieve documents relevant to query query_doc_pair = [[query, doc] for doc in documents] cross_scores = cross_encoder.predict(query_doc_pair) sim_scores_argsort = list(reversed(np.argsort(cross_scores))) logger.warning(f'Finished cross encoder {len(documents)}') documents = [documents[idx] for idx in sim_scores_argsort[:top_k_rank]] logger.warning(f'num documents {len(documents)}') document_time = perf_counter() - document_start logger.warning(f'Finished Retrieving documents in {round(document_time, 2)} seconds...') # Create Prompt prompt = template.render(documents=documents, query=query) prompt_html = template_html.render(documents=documents, query=query) if api_kind == "HuggingFace": generate_fn = generate_hf elif api_kind == "OpenAI": generate_fn = generate_openai elif api_kind is None: gr.Warning("API name was not provided") raise ValueError("API name was not provided") else: gr.Warning(f"API {api_kind} is not supported") raise ValueError(f"API {api_kind} is not supported") try: count_tokens = lambda text: len([token.strip() for token in text.split() if token.strip()]) print(prompt_html,'token count is',count_tokens(prompt_html)) history[-1][1] = "" for character in generate_fn(prompt, history[:-1]): history[-1][1] = character yield history, prompt_html except Exception as e: # Catch any exception print('An unexpected error occurred during generation:', str(e)) yield f"An unexpected error occurred during generation: {str(e)}" with gr.Blocks(theme='WeixuanYuan/Soft_dark') as demo: # Beautiful heading with logo gr.HTML(value="""
A free chat bot developed by National Customs Targeting Center using Open source LLMs.(Dedicated to 75th Batch IRS Probationers)
""", elem_id="description") chatbot = gr.Chatbot( [], elem_id="chatbot", avatar_images=('https://aui.atlassian.com/aui/8.8/docs/images/avatar-person.svg', 'https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.svg'), bubble_full_width=False, show_copy_button=True, show_share_button=True, ) with gr.Row(): txt = gr.Textbox( scale=3, show_label=False, placeholder="Enter text and press enter", container=False, ) txt_btn = gr.Button(value="Submit text", scale=1) api_kind = gr.Radio(choices=["HuggingFace"], value="HuggingFace") prompt_html = gr.HTML() try: # Turn off interactivity while generating if you click txt_msg = txt_btn.click(add_text, [chatbot, txt], [chatbot, txt], queue=False).then( bot, [chatbot, api_kind], [chatbot, prompt_html]) except Exception as e: print ('Exception txt btn click ' ,str(e)) # Turn it back on txt_msg.then(lambda: gr.Textbox(interactive=True), None, [txt], queue=False) try: # Turn off interactivity while generating if you hit enter txt_msg = txt.submit(add_text, [chatbot, txt], [chatbot, txt], queue=False).then( bot, [chatbot, api_kind], [chatbot, prompt_html]) except Exception as e: print ('Exception ' ,str(e)) # Turn it back on txt_msg.then(lambda: gr.Textbox(interactive=True), None, [txt], queue=False) # Examples gr.Examples(examples, txt) demo.queue() demo.launch(debug=True)