Spaces:
Sleeping
Sleeping
| """ | |
| Credit to Derek Thomas, derek@huggingface.co | |
| """ | |
| import subprocess | |
| # subprocess.run(["pip", "install", "--upgrade", "transformers[torch,sentencepiece]==4.34.1"]) | |
| import logging | |
| from pathlib import Path | |
| from time import perf_counter | |
| import gradio as gr | |
| from jinja2 import Environment, FileSystemLoader | |
| import numpy as np | |
| from sentence_transformers import CrossEncoder | |
| from backend.query_llm import generate_hf, generate_openai,generate_gemini | |
| from backend.semantic_search import table, retriever | |
| VECTOR_COLUMN_NAME = "vector" | |
| TEXT_COLUMN_NAME = "text" | |
| proj_dir = Path(__file__).parent | |
| # Setting up the logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Set up the template environment with the templates directory | |
| env = Environment(loader=FileSystemLoader(proj_dir / 'templates')) | |
| # Load the templates directly from the environment | |
| template = env.get_template('template.j2') | |
| template_html = env.get_template('template_html.j2') | |
| # crossEncoder | |
| cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2') | |
| #cross_encoder = CrossEncoder('BAAI/bge-reranker-base') | |
| # Examples | |
| examples = ['My transhipment cargo is missing', | |
| 'What are benefits of the AEO Scheme and eligibility criteria?', | |
| 'What are penalties for customs offences? ', ] | |
| def add_text(history, text): | |
| history = [] if history is None else history | |
| history = history + [(text, None)] | |
| print('add_text function done..returning history' ,history) | |
| return history, gr.Textbox(value="", interactive=False) | |
| def bot(history, api_kind): | |
| top_rerank = 15 | |
| top_k_rank = 5 | |
| query = history[-1][0] | |
| print('history[-1][0]',history[-1][0]) | |
| print('api kind ',api_kind) | |
| if not query: | |
| gr.Warning("Please submit a non-empty string as a prompt") | |
| raise ValueError("Empty string was submitted") | |
| logger.warning('Retrieving documents...') | |
| # Retrieve documents relevant to query | |
| document_start = perf_counter() | |
| query_vec = retriever.encode(query) | |
| print(query) | |
| query_vec_flat = [arr.flatten() for arr in query_vec] | |
| logger.warning(f'Finished query vec') | |
| #documents = table.search(query_vec_flat, vector_column_name=VECTOR_COLUMN_NAME).limit(top_k_rank).to_list() | |
| logger.warning(f'Finished search') | |
| documents = table.search(query_vec, vector_column_name=VECTOR_COLUMN_NAME).limit(top_rerank).to_list() | |
| documents = [doc[TEXT_COLUMN_NAME] for doc in documents] | |
| logger.warning(f'start cross encoder {len(documents)}') | |
| # Retrieve documents relevant to query | |
| query_doc_pair = [[query, doc] for doc in documents] | |
| cross_scores = cross_encoder.predict(query_doc_pair) | |
| sim_scores_argsort = list(reversed(np.argsort(cross_scores))) | |
| logger.warning(f'Finished cross encoder {len(documents)}') | |
| documents = [documents[idx] for idx in sim_scores_argsort[:top_k_rank]] | |
| logger.warning(f'num documents {len(documents)}') | |
| document_time = perf_counter() - document_start | |
| logger.warning(f'Finished Retrieving documents in {round(document_time, 2)} seconds...') | |
| # Create Prompt | |
| prompt = template.render(documents=documents, query=query) | |
| prompt_html = template_html.render(documents=documents, query=query) | |
| if api_kind == "HuggingFace": | |
| generate_fn = generate_hf | |
| elif api_kind == "Gemini": | |
| print("Gemini condition satisfied") | |
| generate_fn = generate_gemini | |
| elif api_kind is None: | |
| gr.Warning("API name was not provided") | |
| raise ValueError("API name was not provided") | |
| else: | |
| gr.Warning(f"API {api_kind} is not supported") | |
| raise ValueError(f"API {api_kind} is not supported") | |
| try: | |
| count_tokens = lambda text: len([token.strip() for token in text.split() if token.strip()]) | |
| print(prompt_html,'token count is',count_tokens(prompt_html)) | |
| history[-1][1] = "" | |
| for character in generate_fn(prompt, history[:-1]): | |
| history[-1][1] = character | |
| yield history, prompt_html | |
| print('final history is ',history) | |
| # return history[-1][1], prompt_html | |
| except Exception as e: # Catch any exception | |
| print('An unexpected error occurred during generation:', str(e)) | |
| yield f"An unexpected error occurred during generation: {str(e)}" | |
| with gr.Blocks(theme='WeixuanYuan/Soft_dark') as demo: | |
| # Beautiful heading with logo | |
| gr.HTML(value=""" | |
| <div style="display: flex; align-items: center; justify-content: space-between;"> | |
| <h1 style="color: #008000">ADWITIYA - <span style="color: #008000">Customs Manual Chatbot(UNDER MAINTENANCE..WILL BE BACK SOON)</span></h1> | |
| <img src='logo.png' alt="Chatbot" width="50" height="50" /> | |
| </div> | |
| """, elem_id="heading") | |
| # Formatted description | |
| gr.HTML(value="""<p style="font-family: sans-serif; font-size: 16px;">A free chat bot developed by National Customs Targeting Center using Open source LLMs.(Dedicated to 75th Batch IRS Probationers)</p>""", elem_id="description") | |
| chatbot = gr.Chatbot( | |
| [], | |
| elem_id="chatbot", | |
| avatar_images=('https://aui.atlassian.com/aui/8.8/docs/images/avatar-person.svg', | |
| 'https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.svg'), | |
| bubble_full_width=False, | |
| show_copy_button=True, | |
| show_share_button=True, | |
| ) | |
| chattext = gr.Textbox() | |
| with gr.Row(): | |
| txt = gr.Textbox( | |
| scale=3, | |
| show_label=False, | |
| placeholder="Enter text and press enter", | |
| container=False, | |
| ) | |
| txt_btn = gr.Button(value="Submit text", scale=1) | |
| api_kind = gr.Radio(choices=["HuggingFace","Gemini"], value="HuggingFace") | |
| #prompt_html = gr.HTML() | |
| prompt_html = gr.Textbox() | |
| try: | |
| # Turn off interactivity while generating if you click | |
| txt_msg = txt_btn.click(add_text, [chatbot, txt], [chatbot, txt], queue=False).then( | |
| bot, [chatbot, api_kind], [chattext, prompt_html]) | |
| except Exception as e: | |
| print ('Exception txt btn click ' ,str(e)) | |
| # Turn it back on | |
| txt_msg.then(lambda: gr.Textbox(interactive=True), None, [txt], queue=False) | |
| try: | |
| # Turn off interactivity while generating if you hit enter | |
| txt_msg = txt.submit(add_text, [chatbot, txt], [chatbot, txt], queue=False).then( | |
| bot, [chatbot, api_kind], [chattext, prompt_html]) | |
| except Exception as e: | |
| print ('Exception ' ,str(e)) | |
| # Turn it back on | |
| txt_msg.then(lambda: gr.Textbox(interactive=True), None, [txt], queue=False) | |
| # Examples | |
| gr.Examples(examples, txt) | |
| demo.queue() | |
| demo.launch(debug=True) | |