import gradio as gr from utils import create_user_id # Langchain from langchain.embeddings import HuggingFaceEmbeddings from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler # ClimateQ&A imports from anyqa.config import get_domains from anyqa.chains import load_qa_chain_with_text, load_reformulation_chain from anyqa.embeddings import EMBEDDING_MODEL_NAME from anyqa.llm import get_llm from anyqa.prompts import audience_prompts from anyqa.qa_logging import log from anyqa.retriever import QARetriever from anyqa.source_table import generate_source_table from anyqa.vectorstore import get_vectorstore # Load environment variables in local mode try: from dotenv import load_dotenv load_dotenv() except Exception as e: pass # Set up Gradio Theme theme = gr.themes.Base( primary_hue="blue", secondary_hue="red", font=[gr.themes.GoogleFont("Poppins"), "ui-sans-serif", "system-ui", "sans-serif"], ) init_prompt = "" system_template = { "role": "system", "content": init_prompt, } user_id = create_user_id() # --------------------------------------------------------------------------- # ClimateQ&A core functions # --------------------------------------------------------------------------- from langchain.callbacks.base import BaseCallbackHandler from queue import Empty from threading import Thread from langchain.schema import LLMResult from typing import Any, Union, Dict, List from queue import SimpleQueue # # Create a Queue # Q = Queue() import re def parse_output_llm_with_sources(output): # Split the content into a list of text and "[Doc X]" references content_parts = re.split(r"\[(Doc\s?\d+(?:,\s?Doc\s?\d+)*)\]", output) parts = [] for part in content_parts: if part.startswith("Doc"): subparts = part.split(",") subparts = [ subpart.lower().replace("doc", "").strip() for subpart in subparts ] subparts = [ f"{subpart}" for subpart in subparts ] parts.append("".join(subparts)) else: parts.append(part) content_parts = "".join(parts) return content_parts job_done = object() # signals the processing is done class StreamingGradioCallbackHandler(BaseCallbackHandler): def __init__(self, q: SimpleQueue): self.q = q def on_llm_start( self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any ) -> None: """Run when LLM starts running. Clean the queue.""" while not self.q.empty(): try: self.q.get(block=False) except Empty: continue def on_llm_new_token(self, token: str, **kwargs: Any) -> None: """Run on new LLM token. Only available when streaming is enabled.""" self.q.put(token) def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: """Run when LLM ends running.""" self.q.put(job_done) def on_llm_error( self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any ) -> None: """Run when LLM errors.""" self.q.put(job_done) # Create embeddings function and LLM embeddings_function = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL_NAME) # Create vectorstore and retriever vectorstore = get_vectorstore(embeddings_function) # --------------------------------------------------------------------------- # ClimateQ&A Streaming # From https://github.com/gradio-app/gradio/issues/5345 # And https://stackoverflow.com/questions/76057076/how-to-stream-agents-response-in-langchain # --------------------------------------------------------------------------- from threading import Thread def answer_user(query, query_example, history): return query, history + [[query, ". . ."]] def answer_user_example(query, query_example, history): return query_example, history + [[query_example, ". . ."]] def fetch_sources(query, domains): llm_reformulation = get_llm( max_tokens=512, temperature=0.0, verbose=True, streaming=False ) print("domains", domains) retriever = QARetriever( vectorstore=vectorstore, domains=domains, k_summary=0, k_total=10 ) reformulation_chain = load_reformulation_chain(llm_reformulation) # Calculate language output_reformulation = reformulation_chain({"query": query}) question = output_reformulation["question"] language = output_reformulation["language"] # Retrieve docs docs = retriever.get_relevant_documents(question) if len(docs) > 0: # Already display the sources sources_text = [] for i, d in enumerate(docs, 1): sources_text.append(make_html_source(d, i)) citations_text = "".join(sources_text) docs_text = "\n\n".join([d.page_content for d in docs]) return "", citations_text, docs_text, question, language else: sources_text = ( "⚠️ No relevant passages found in the scientific reports (IPCC and IPBES)" ) citations_text = "**⚠️ No relevant passages found in the sources, you may want to ask a more specific question.**" docs_text = "" return "", citations_text, docs_text, question, language def answer_bot(query, history, docs, question, language, audience): if audience == "Children": audience_prompt = audience_prompts["children"] elif audience == "General public": audience_prompt = audience_prompts["general"] elif audience == "Experts": audience_prompt = audience_prompts["experts"] else: audience_prompt = audience_prompts["experts"] # Prepare Queue for streaming LLMs Q = SimpleQueue() llm_streaming = get_llm( max_tokens=1000, temperature=0.0, verbose=True, streaming=True, callbacks=[StreamingGradioCallbackHandler(Q), StreamingStdOutCallbackHandler()], ) qa_chain = load_qa_chain_with_text(llm_streaming) def threaded_chain(question, audience, language, docs): try: response = qa_chain( { "question": question, "audience": audience, "language": language, "summaries": docs, } ) Q.put(response) Q.put(job_done) except Exception as e: print(e) history[-1][1] = "" textbox = gr.Textbox( placeholder=". . .", show_label=False, scale=1, lines=1, interactive=False ) if len(docs) > 0: # Start thread for streaming thread = Thread( target=threaded_chain, kwargs={ "question": question, "audience": audience_prompt, "language": language, "docs": docs, }, ) thread.start() while True: next_item = Q.get(block=True) # Blocks until an input is available if next_item is job_done: break elif isinstance(next_item, str): new_paragraph = history[-1][1] + next_item new_paragraph = parse_output_llm_with_sources(new_paragraph) history[-1][1] = new_paragraph yield textbox, history else: pass thread.join() log(question=question, history=history, docs=docs, user_id=user_id) else: complete_response = "**⚠️ No relevant passages found in the sources, you may want to ask a more specific question.**" history[-1][1] += complete_response yield "", history # --------------------------------------------------------------------------- # ClimateQ&A core functions # --------------------------------------------------------------------------- def make_html_source(source, i): meta = source.metadata content = source.page_content.split(":", 1)[1].strip() link = ( f'🔗' if "url" in meta else "" ) return f"""

Doc {i} - {meta['short_name']} - Page {int(meta['page_number'])}

{content}

""" def reset_textbox(): return gr.update(value="") # -------------------------------------------------------------------- # Gradio # -------------------------------------------------------------------- init_prompt = """ Hello, I'm a conversational assistant. I will answer your questions by **sifting through trusted data sources**. 💡 How to use - **Language**: You can ask me your questions in any language. - **Audience**: You can specify your audience (children, general public, experts) to get a more adapted answer. - **Sources**: You can choose to search in which sources you want me to look for answers. By default, I will search in all sources. ⚠️ Limitations *Please note that the AI is not perfect and may sometimes give irrelevant answers. If you are not satisfied with the answer, please ask a more specific question or report your feedback to help us improve the system.* ❓ What do you want to learn ? """ def vote(data: gr.LikeData): if data.liked: print(data.value) else: print(data) def change_tab(): return gr.Tabs.update(selected=1) with gr.Blocks(title="❓ Q&A", css="style.css", theme=theme) as demo: # user_id_state = gr.State([user_id]) with gr.Tab("❓ Q&A"): with gr.Row(elem_id="chatbot-row"): with gr.Column(scale=2): # state = gr.State([system_template]) bot = gr.Chatbot( value=[[None, init_prompt]], show_copy_button=True, show_label=False, elem_id="chatbot", layout="panel", avatar_images=("assets/bot_avatar.png", None), ) # bot.like(vote,None,None) with gr.Row(elem_id="input-message"): textbox = gr.Textbox( placeholder="Ask me anything here!", show_label=False, scale=1, lines=1, interactive=True, max_lines=2 ) # submit_button = gr.Button(">",scale = 1,elem_id = "submit-button") with gr.Column(scale=1, variant="panel", elem_id="right-panel"): with gr.Tabs() as tabs: with gr.TabItem("📝 Examples", elem_id="tab-examples", id=0): examples_hidden = gr.Textbox(elem_id="hidden-message") questions = [ "How does Daoism view our dependence on modern technology?", "From a Confucian perspective, what is the role of tradition in modern society?", "How might Daoism influence sustainable economic practices?", "Does Confucianism advocate for a particular economic model?", "How does Daoism interpret the dynamics of modern relationships?", "From a Confucian viewpoint, what are the responsibilities of individuals in a family?", "How might Daoism guide our approach to mental and physical health?", "Does Confucianism offer insights into educational methods?", "How does Daoism view the purpose and methods of modern education?", "From a Confucian perspective, what is the importance of social harmony?", ] examples_questions = gr.Examples( questions, [examples_hidden], examples_per_page=10, run_on_click=False, # cache_examples=True, ) with gr.Tab("📚 Citations", elem_id="tab-citations", id=1): sources_textbox = gr.HTML( show_label=False, elem_id="sources-textbox" ) docs_textbox = gr.State("") with gr.Tab("⚙️ Configuration", elem_id="tab-config", id=2): gr.Markdown( "Reminder: You can talk in any language, this tool is multi-lingual!" ) domains = get_domains() dropdown_domains = gr.CheckboxGroup( domains, label="Select source types", value=[], interactive=True, ) dropdown_audience = gr.Dropdown( ["Children", "General public", "Experts"], label="Select audience", value="Experts", interactive=True, ) output_query = gr.Textbox( label="Query used for retrieval", show_label=True, elem_id="reformulated-query", lines=2, interactive=False, ) output_language = gr.Textbox( label="Language", show_label=True, elem_id="language", lines=1, interactive=False, ) ( textbox.submit( answer_user, [textbox, examples_hidden, bot], [textbox, bot], queue=False, ) .success(change_tab, None, tabs) .success( fetch_sources, [textbox, dropdown_domains], [ textbox, sources_textbox, docs_textbox, output_query, output_language, ], ) .success( answer_bot, [ textbox, bot, docs_textbox, output_query, output_language, dropdown_audience, ], [textbox, bot], queue=True, ) .success(lambda x: textbox, [textbox], [textbox]) ) ( examples_hidden.change( answer_user_example, [textbox, examples_hidden, bot], [textbox, bot], queue=False, ) .success(change_tab, None, tabs) .success( fetch_sources, [textbox, dropdown_domains], [ textbox, sources_textbox, docs_textbox, output_query, output_language, ], ) .success( answer_bot, [ textbox, bot, docs_textbox, output_query, output_language, dropdown_audience, ], [textbox, bot], queue=True, ) .success(lambda x: textbox, [textbox], [textbox]) ) # --------------------------------------------------------------------------------------- # OTHER TABS # --------------------------------------------------------------------------------------- with gr.Tab("ℹ️ About", elem_classes="max-height"): gr.Markdown( """
💡 How does this tool work?
This tool harnesses modern OCR techniques to parse and preprocess documents. By leveraging state-of-the-art question-answering algorithms, our tool is able to sift through the extensive collection of trusted sources and identify relevant passages in response to user inquiries. Furthermore, the integration of the ChatGPT API allows Q&A to present complex data in a user-friendly manner, summarizing key points and facilitating communication to a wider audience.
""" ) gr.Markdown("## How to use") gr.Markdown( """ ### 💪 Getting started - In the chatbot section, simply type your question, and the app will provide an answer with references to relevant sources. - the app retrieves specific passages to help answer your question accurately. - Source information, including page numbers and passages, is displayed on the right side of the screen for easy verification. - Feel free to ask follow-up questions within the chatbot for a more in-depth understanding. - You can ask question in any language, the tool is multi-lingual ! """ ) gr.Markdown( """ ### ⚠️ Limitations
""" ) with gr.Tab("👩‍💻 Community"): gr.Markdown( """ We welcomes community contributions. To participate, head over to the Community Tab and create a "New Discussion" to ask questions and share your insights. *This tool is a fork from the work done by the R&D lab at **Ekimetrics** for Climate Q&A: https://climateqa.com/.* """ ) with gr.Tab("📚 Sources", elem_classes="max-height"): gr.Markdown(generate_source_table()) with gr.Tab("🛢️ Carbon Footprint"): gr.Markdown( """ Carbon emissions were measured during the development and inference process using CodeCarbon [https://github.com/mlco2/codecarbon](https://github.com/mlco2/codecarbon) | Phase | Description | Emissions | Source | | --- | --- | --- | --- | | Development | OCR and parsing all pdf documents with AI | 28gCO2e | CodeCarbon | | Development | Question Answering development | 114gCO2e | CodeCarbon | | Inference | Question Answering | ~0.102gCO2e / call | CodeCarbon | | Inference | API call to turbo-GPT | ~0.38gCO2e / call | https://medium.com/@chrispointon/the-carbon-footprint-of-chatgpt-e1bc14e4cc2a | Carbon Emissions are **relatively low but not negligible** compared to other usages: one question asked is around 0.482gCO2e - equivalent to 2.2m by car (https://datagir.ademe.fr/apps/impact-co2/) Or around 2 to 4 times more than a typical Google search. """ ) with gr.Tab("🪄 Changelog"): gr.Markdown( """ ##### v1.0.0 - 2023-10-25 - Forked ClimateQ&A - Added Chroma as vector store - Added support for OpenAI api - Added support for other topics """ ) demo.queue(concurrency_count=16) demo.launch()