import io import os import ssl from contextlib import closing from typing import Optional, Tuple import datetime import boto3 import gradio as gr import requests import warnings from langchain import ConversationChain, LLMChain from langchain.agents import load_tools, initialize_agent from langchain.chains.conversation.memory import ConversationBufferMemory from langchain.llms import OpenAI from threading import Lock # Console to variable from io import StringIO import sys import re from openai.error import AuthenticationError, InvalidRequestError, RateLimitError # Pertains to Express-inator functionality from langchain.prompts import PromptTemplate from polly_utils import PollyVoiceData, NEURAL_ENGINE from azure_utils import AzureVoiceData # Pertains to question answering functionality from langchain.embeddings.openai import OpenAIEmbeddings from langchain.text_splitter import CharacterTextSplitter from langchain.vectorstores.faiss import FAISS from langchain.docstore.document import Document from langchain.chains.question_answering import load_qa_chain news_api_key = os.environ["NEWS_API_KEY"] tmdb_bearer_token = os.environ["TMDB_BEARER_TOKEN"] TOOLS_LIST = ['serpapi', 'wolfram-alpha', 'pal-math', 'pal-colored-objects'] # TOOLS_DEFAULT_LIST = ['serpapi', 'pal-math'] TOOLS_DEFAULT_LIST = [] BUG_FOUND_MSG = "Congratulations, you've found a bug in this application!" # AUTH_ERR_MSG = "Please paste your OpenAI key from openai.com to use this application. It is not necessary to hit a button or key after pasting it." AUTH_ERR_MSG = "Please paste your OpenAI key from openai.com to use this application. " MAX_TOKENS = 512 LOOPING_TALKING_HEAD = "videos/Marc.mp4" TALKING_HEAD_WIDTH = "192" MAX_TALKING_HEAD_TEXT_LENGTH = 155 # Pertains to Express-inator functionality NUM_WORDS_DEFAULT = 0 MAX_WORDS = 400 FORMALITY_DEFAULT = "N/A" TEMPERATURE_DEFAULT = 0.5 EMOTION_DEFAULT = "N/A" LANG_LEVEL_DEFAULT = "N/A" TRANSLATE_TO_DEFAULT = "N/A" LITERARY_STYLE_DEFAULT = "N/A" PROMPT_TEMPLATE = PromptTemplate( input_variables=["original_words", "num_words", "formality", "emotions", "lang_level", "translate_to", "literary_style"], template="Restate {num_words}{formality}{emotions}{lang_level}{translate_to}{literary_style}the following: \n{original_words}\n", ) POLLY_VOICE_DATA = PollyVoiceData() AZURE_VOICE_DATA = AzureVoiceData() # Pertains to WHISPER functionality WHISPER_DETECT_LANG = "Detect language" warnings.filterwarnings("ignore") # Temporarily address Wolfram Alpha SSL certificate issue ssl._create_default_https_context = ssl._create_unverified_context # Pertains to Express-inator functionality def transform_text(desc, express_chain, num_words, formality, anticipation_level, joy_level, trust_level, fear_level, surprise_level, sadness_level, disgust_level, anger_level, lang_level, translate_to, literary_style): num_words_prompt = "" if num_words and int(num_words) != 0: num_words_prompt = "using up to " + str(num_words) + " words, " # Change some arguments to lower case formality = formality.lower() anticipation_level = anticipation_level.lower() joy_level = joy_level.lower() trust_level = trust_level.lower() fear_level = fear_level.lower() surprise_level = surprise_level.lower() sadness_level = sadness_level.lower() disgust_level = disgust_level.lower() anger_level = anger_level.lower() formality_str = "" if formality != "n/a": formality_str = "in a " + formality + " manner, " # put all emotions into a list emotions = [] if anticipation_level != "n/a": emotions.append(anticipation_level) if joy_level != "n/a": emotions.append(joy_level) if trust_level != "n/a": emotions.append(trust_level) if fear_level != "n/a": emotions.append(fear_level) if surprise_level != "n/a": emotions.append(surprise_level) if sadness_level != "n/a": emotions.append(sadness_level) if disgust_level != "n/a": emotions.append(disgust_level) if anger_level != "n/a": emotions.append(anger_level) emotions_str = "" if len(emotions) > 0: if len(emotions) == 1: emotions_str = "with emotion of " + emotions[0] + ", " else: emotions_str = "with emotions of " + ", ".join(emotions[:-1]) + " and " + emotions[-1] + ", " lang_level_str = "" if lang_level != LANG_LEVEL_DEFAULT: lang_level_str = "at a " + lang_level + " level, " if translate_to == TRANSLATE_TO_DEFAULT else "" translate_to_str = "" if translate_to != TRANSLATE_TO_DEFAULT: translate_to_str = "translated to " + ( "" if lang_level == TRANSLATE_TO_DEFAULT else lang_level + " level ") + translate_to + ", " literary_style_str = "" if literary_style != LITERARY_STYLE_DEFAULT: if literary_style == "Prose": literary_style_str = "as prose, " if literary_style == "Story": literary_style_str = "as a story, " elif literary_style == "Summary": literary_style_str = "as a summary, " elif literary_style == "Outline": literary_style_str = "as an outline numbers and lower case letters, " elif literary_style == "Bullets": literary_style_str = "as bullet points using bullets, " elif literary_style == "Poetry": literary_style_str = "as a poem, " elif literary_style == "Haiku": literary_style_str = "as a haiku, " elif literary_style == "Limerick": literary_style_str = "as a limerick, " elif literary_style == "Rap": literary_style_str = "as a rap, " elif literary_style == "Joke": literary_style_str = "as a very funny joke with a setup and punchline, " elif literary_style == "Knock-knock": literary_style_str = "as a very funny knock-knock joke, " elif literary_style == "FAQ": literary_style_str = "as a FAQ with several questions and answers, " formatted_prompt = PROMPT_TEMPLATE.format( original_words=desc, num_words=num_words_prompt, formality=formality_str, emotions=emotions_str, lang_level=lang_level_str, translate_to=translate_to_str, literary_style=literary_style_str ) trans_instr = num_words_prompt + formality_str + emotions_str + lang_level_str + translate_to_str + literary_style_str if express_chain and len(trans_instr.strip()) > 0: generated_text = express_chain.run( {'original_words': desc, 'num_words': num_words_prompt, 'formality': formality_str, 'emotions': emotions_str, 'lang_level': lang_level_str, 'translate_to': translate_to_str, 'literary_style': literary_style_str}).strip() else: print("Not transforming text") generated_text = desc # replace all newlines with
in generated_text generated_text = generated_text.replace("\n", "\n\n") prompt_plus_generated = "GPT prompt: " + formatted_prompt + "\n\n" + generated_text print("\n==== date/time: " + str(datetime.datetime.now() - datetime.timedelta(hours=5)) + " ====") print("prompt_plus_generated: " + prompt_plus_generated) return generated_text def load_chain(tools_list, llm): chain = None express_chain = None memory = None if llm: print("\ntools_list", tools_list) tool_names = tools_list tools = load_tools(tool_names, llm=llm, news_api_key=news_api_key, tmdb_bearer_token=tmdb_bearer_token, serpapi_api_key=os.environ["SERP_API_KEY"]) memory = ConversationBufferMemory(memory_key="chat_history") chain = initialize_agent(tools, llm, agent="conversational-react-description", verbose=True, memory=memory) express_chain = LLMChain(llm=llm, prompt=PROMPT_TEMPLATE, verbose=True) return chain, express_chain, memory def set_openai_api_key(api_key): """Set the api key and return chain. If no api_key, then None is returned. """ if api_key and api_key.startswith("sk-") and len(api_key) > 50: os.environ["OPENAI_API_KEY"] = api_key print("\n\n ++++++++++++++ Setting OpenAI API key ++++++++++++++ \n\n") print(str(datetime.datetime.now()) + ": Before OpenAI, OPENAI_API_KEY length: " + str( len(os.environ["OPENAI_API_KEY"]))) llm = OpenAI(temperature=0, max_tokens=MAX_TOKENS) print(str(datetime.datetime.now()) + ": After OpenAI, OPENAI_API_KEY length: " + str( len(os.environ["OPENAI_API_KEY"]))) chain, express_chain, memory = load_chain(TOOLS_DEFAULT_LIST, llm) # Pertains to question answering functionality embeddings = OpenAIEmbeddings() qa_chain = load_qa_chain(OpenAI(temperature=0), chain_type="stuff") print(str(datetime.datetime.now()) + ": After load_chain, OPENAI_API_KEY length: " + str( len(os.environ["OPENAI_API_KEY"]))) os.environ["OPENAI_API_KEY"] = "" return chain, express_chain, llm, embeddings, qa_chain, memory return None, None, None, None, None, None def run_chain(chain, inp, capture_hidden_text): output = "" hidden_text = None if capture_hidden_text: error_msg = None tmp = sys.stdout hidden_text_io = StringIO() sys.stdout = hidden_text_io try: output = chain.run(input=inp) except AuthenticationError as ae: error_msg = AUTH_ERR_MSG + str(datetime.datetime.now()) + ". " + str(ae) print("error_msg", error_msg) except RateLimitError as rle: error_msg = "\n\nRateLimitError: " + str(rle) except ValueError as ve: error_msg = "\n\nValueError: " + str(ve) except InvalidRequestError as ire: error_msg = "\n\nInvalidRequestError: " + str(ire) except Exception as e: error_msg = "\n\n" + BUG_FOUND_MSG + ":\n\n" + str(e) sys.stdout = tmp hidden_text = hidden_text_io.getvalue() # remove escape characters from hidden_text hidden_text = re.sub(r'\x1b[^m]*m', '', hidden_text) # remove "Entering new AgentExecutor chain..." from hidden_text hidden_text = re.sub(r"Entering new AgentExecutor chain...\n", "", hidden_text) # remove "Finished chain." from hidden_text hidden_text = re.sub(r"Finished chain.", "", hidden_text) # Add newline after "Thought:" "Action:" "Observation:" "Input:" and "AI:" hidden_text = re.sub(r"Thought:", "\n\nThought:", hidden_text) hidden_text = re.sub(r"Action:", "\n\nAction:", hidden_text) hidden_text = re.sub(r"Observation:", "\n\nObservation:", hidden_text) hidden_text = re.sub(r"Input:", "\n\nInput:", hidden_text) hidden_text = re.sub(r"AI:", "\n\nAI:", hidden_text) if error_msg: hidden_text += error_msg print("hidden_text: ", hidden_text) else: try: output = chain.run(input=inp) except AuthenticationError as ae: output = AUTH_ERR_MSG + str(datetime.datetime.now()) + ". " + str(ae) print("output", output) except RateLimitError as rle: output = "\n\nRateLimitError: " + str(rle) except ValueError as ve: output = "\n\nValueError: " + str(ve) except InvalidRequestError as ire: output = "\n\nInvalidRequestError: " + str(ire) except Exception as e: output = "\n\n" + BUG_FOUND_MSG + ":\n\n" + str(e) return output, hidden_text def reset_memory(history, memory): memory.clear() history = [] return history, history, memory class ChatWrapper: def __init__(self): self.lock = Lock() def __call__( self, api_key: str, inp: str, history: Optional[Tuple[str, str]], chain: Optional[ConversationChain], trace_chain: bool, speak_text: bool, talking_head: bool, monologue: bool, express_chain: Optional[LLMChain], num_words, formality, anticipation_level, joy_level, trust_level, fear_level, surprise_level, sadness_level, disgust_level, anger_level, lang_level, translate_to, literary_style, qa_chain, docsearch, use_embeddings ): """Execute the chat functionality.""" self.lock.acquire() try: print("\n==== date/time: " + str(datetime.datetime.now()) + " ====") print("inp: " + inp) print("trace_chain: ", trace_chain) print("speak_text: ", speak_text) print("talking_head: ", talking_head) print("monologue: ", monologue) history = history or [] # If chain is None, that is because no API key was provided. output = "Please paste your OpenAI key from openai.com to use this app. " + str(datetime.datetime.now()) hidden_text = output if chain: # Set OpenAI key import openai openai.api_key = api_key if not monologue: if use_embeddings: if inp and inp.strip() != "": if docsearch: docs = docsearch.similarity_search(inp) output = str(qa_chain.run(input_documents=docs, question=inp)) else: output, hidden_text = "Please supply some text in the the Embeddings tab.", None else: output, hidden_text = "What's on your mind?", None else: output, hidden_text = run_chain(chain, inp, capture_hidden_text=trace_chain) else: output, hidden_text = inp, None output = transform_text(output, express_chain, num_words, formality, anticipation_level, joy_level, trust_level, fear_level, surprise_level, sadness_level, disgust_level, anger_level, lang_level, translate_to, literary_style) text_to_display = output if trace_chain: text_to_display = hidden_text + "\n\n" + output history.append((inp, text_to_display)) html_video, temp_file, html_audio, temp_aud_file = None, None, None, None if speak_text: if talking_head: if len(output) <= MAX_TALKING_HEAD_TEXT_LENGTH: html_video, temp_file = do_html_video_speak(output, translate_to) else: temp_file = LOOPING_TALKING_HEAD html_video = create_html_video(temp_file, TALKING_HEAD_WIDTH) html_audio, temp_aud_file = do_html_audio_speak(output, translate_to) else: html_audio, temp_aud_file = do_html_audio_speak(output, translate_to) else: if talking_head: temp_file = LOOPING_TALKING_HEAD html_video = create_html_video(temp_file, TALKING_HEAD_WIDTH) else: # html_audio, temp_aud_file = do_html_audio_speak(output, translate_to) # html_video = create_html_video(temp_file, "128") pass except Exception as e: raise e finally: self.lock.release() return history, history, html_video, temp_file, html_audio, temp_aud_file, "" # return history, history, html_audio, temp_aud_file, "" chat = ChatWrapper() def do_html_audio_speak(words_to_speak, polly_language): polly_client = boto3.Session( aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"], aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"], region_name='eu-west-2' ).client('polly') # voice_id, language_code, engine = POLLY_VOICE_DATA.get_voice(polly_language, "Female") voice_id, language_code, engine = POLLY_VOICE_DATA.get_voice(polly_language, "Male") if not voice_id: # voice_id = "Joanna" voice_id = "Matthew" language_code = "en-US" engine = NEURAL_ENGINE response = polly_client.synthesize_speech( Text=words_to_speak, OutputFormat='mp3', VoiceId=voice_id, LanguageCode=language_code, Engine=engine ) html_audio = '
no audio
' # Save the audio stream returned by Amazon Polly on Lambda's temp directory if "AudioStream" in response: with closing(response["AudioStream"]) as stream: # output = os.path.join("/tmp/", "speech.mp3") try: with open('audios/tempfile.mp3', 'wb') as f: f.write(stream.read()) temp_aud_file = gr.File("audios/tempfile.mp3") temp_aud_file_url = "/file=" + temp_aud_file.value['name'] html_audio = f'' except IOError as error: # Could not write to file, exit gracefully print(error) return None, None else: # The response didn't contain audio data, exit gracefully return None, None return html_audio, "audios/tempfile.mp3" def create_html_video(file_name, width): temp_file_url = "/file=" + tmp_file.value['name'] html_video = f'' return html_video def do_html_video_speak(words_to_speak, azure_language): azure_voice = AZURE_VOICE_DATA.get_voice(azure_language, "Male") if not azure_voice: azure_voice = "en-US-ChristopherNeural" headers = {"Authorization": f"Bearer {os.environ['EXHUMAN_API_KEY']}"} body = { 'bot_name': 'Marc', 'bot_response': words_to_speak, 'azure_voice': azure_voice, 'azure_style': 'friendly', 'animation_pipeline': 'high_speed', 'idle_url': 'https://ugc-idle.s3-us-west-2.amazonaws.com/580bd351e4f22629d1618f4f4d80d9a2.mp4' } api_endpoint = "https://api.exh.ai/animations/v1/generate_lipsync" res = requests.post(api_endpoint, json=body, headers=headers) print("res.status_code: ", res.status_code) html_video = '
no video
' if isinstance(res.content, bytes): response_stream = io.BytesIO(res.content) print("len(res.content)): ", len(res.content)) with open('videos/tempfile.mp4', 'wb') as f: f.write(response_stream.read()) temp_file = gr.File("videos/tempfile.mp4") temp_file_url = "/file=" + temp_file.value['name'] html_video = f'' else: print('video url unknown') return html_video, "videos/tempfile.mp4" def update_selected_tools(widget, state, llm): if widget: state = widget chain, express_chain, memory = load_chain(state, llm) return state, llm, chain, express_chain def update_talking_head(widget, state): if widget: state = widget video_html_talking_head = create_html_video(LOOPING_TALKING_HEAD, TALKING_HEAD_WIDTH) return state, video_html_talking_head else: # return state, create_html_video(LOOPING_TALKING_HEAD, "32") return None, "
"


def update_foo(widget, state):
    if widget:
        state = widget
        return state


# Pertains to question answering functionality
def update_embeddings(embeddings_text, embeddings, qa_chain):
    if embeddings_text:
        text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
        texts = text_splitter.split_text(embeddings_text)

        docsearch = FAISS.from_texts(texts, embeddings)
        print("Embeddings updated")
        return docsearch


# Pertains to question answering functionality
def update_use_embeddings(widget, state):
    if widget:
        state = widget
        return state


with gr.Blocks(css="css/custom_css.css", title="Expert Answer Demo") as block:
    llm_state = gr.State()
    history_state = gr.State()
    chain_state = gr.State()
    express_chain_state = gr.State()
    tools_list_state = gr.State(TOOLS_DEFAULT_LIST)
    trace_chain_state = gr.State(False)
    speak_text_state = gr.State(False)
    talking_head_state = gr.State(True)
    monologue_state = gr.State(False)  # Takes the input and repeats it back to the user, optionally transforming it.
    memory_state = gr.State()

    # Pertains to Express-inator functionality
    num_words_state = gr.State(NUM_WORDS_DEFAULT)
    formality_state = gr.State(FORMALITY_DEFAULT)
    anticipation_level_state = gr.State(EMOTION_DEFAULT)
    joy_level_state = gr.State(EMOTION_DEFAULT)
    trust_level_state = gr.State(EMOTION_DEFAULT)
    fear_level_state = gr.State(EMOTION_DEFAULT)
    surprise_level_state = gr.State(EMOTION_DEFAULT)
    sadness_level_state = gr.State(EMOTION_DEFAULT)
    disgust_level_state = gr.State(EMOTION_DEFAULT)
    anger_level_state = gr.State(EMOTION_DEFAULT)
    lang_level_state = gr.State(LANG_LEVEL_DEFAULT)
    translate_to_state = gr.State(TRANSLATE_TO_DEFAULT)
    literary_style_state = gr.State(LITERARY_STYLE_DEFAULT)

    # Pertains to WHISPER functionality
    whisper_lang_state = gr.State(WHISPER_DETECT_LANG)

    # Pertains to question answering functionality
    embeddings_state = gr.State()
    qa_chain_state = gr.State()
    docsearch_state = gr.State()
    use_embeddings_state = gr.State(False)

    gr.HTML("""
        

""") with gr.Tab("Chat"): with gr.Row(): openai_api_key_textbox = gr.Textbox(placeholder="Paste your OpenAI API key (sk-...)", show_label=False, lines=1, type='password', elem_id="gr-component") with gr.Row(): with gr.Column(scale=1, min_width=TALKING_HEAD_WIDTH, visible=True): speak_text_cb = gr.Checkbox(label="Enable speech", value=False) speak_text_cb.change(update_foo, inputs=[speak_text_cb, speak_text_state], outputs=[speak_text_state]) my_file = gr.File(label="Upload a file", type="file", visible=False) tmp_file = gr.File(LOOPING_TALKING_HEAD, visible=False) # tmp_file_url = "/file=" + tmp_file.value['name'] htm_video = create_html_video(LOOPING_TALKING_HEAD, TALKING_HEAD_WIDTH) video_html = gr.HTML(htm_video) # my_aud_file = gr.File(label="Audio file", type="file", visible=True) tmp_aud_file = gr.File("audios/tempfile.mp3", visible=False) tmp_aud_file_url = "/file=" + tmp_aud_file.value['name'] htm_audio = f'' audio_html = gr.HTML(htm_audio) with gr.Column(scale=7): chatbot = gr.Chatbot(elem_id="gr-component") with gr.Row(): message = gr.Textbox(label="What's on your mind?", placeholder="What's the answer to life, the universe, and everything?", lines=1, elem_id="gr-component") submit = gr.Button(value="Send", variant="secondary").style(full_width=False) with gr.Tab("Settings"): tools_cb_group = gr.CheckboxGroup(label="Tools:", choices=TOOLS_LIST, value=TOOLS_DEFAULT_LIST, elem_id="gr-component") tools_cb_group.change(update_selected_tools, inputs=[tools_cb_group, tools_list_state, llm_state], outputs=[tools_list_state, llm_state, chain_state, express_chain_state]) trace_chain_cb = gr.Checkbox(label="Show reasoning chain in chat bubble", value=False, elem_id="gr-component") trace_chain_cb.change(update_foo, inputs=[trace_chain_cb, trace_chain_state], outputs=[trace_chain_state]) # speak_text_cb = gr.Checkbox(label="Speak text from agent", value=False) # speak_text_cb.change(update_foo, inputs=[speak_text_cb, speak_text_state], # outputs=[speak_text_state]) talking_head_cb = gr.Checkbox(label="Show talking head", value=True) talking_head_cb.change(update_talking_head, inputs=[talking_head_cb, talking_head_state], outputs=[talking_head_state, video_html]) # monologue_cb = gr.Checkbox(label="Babel fish mode (translate/restate what you enter, no conversational agent)", # value=False) # monologue_cb.change(update_foo, inputs=[monologue_cb, monologue_state], # outputs=[monologue_state]) reset_btn = gr.Button(value="Reset chat", variant="secondary").style(full_width=False) reset_btn.click(reset_memory, inputs=[history_state, memory_state], outputs=[chatbot, history_state, memory_state]) message.submit(chat, inputs=[openai_api_key_textbox, message, history_state, chain_state, trace_chain_state, speak_text_state, talking_head_state, monologue_state, express_chain_state, num_words_state, formality_state, anticipation_level_state, joy_level_state, trust_level_state, fear_level_state, surprise_level_state, sadness_level_state, disgust_level_state, anger_level_state, lang_level_state, translate_to_state, literary_style_state, qa_chain_state, docsearch_state, use_embeddings_state], outputs=[chatbot, history_state, video_html, my_file, audio_html, tmp_aud_file, message]) # outputs=[chatbot, history_state, message]) # outputs=[chatbot, history_state, audio_html, tmp_aud_file, message]) submit.click(chat, inputs=[openai_api_key_textbox, message, history_state, chain_state, trace_chain_state, speak_text_state, talking_head_state, monologue_state, express_chain_state, num_words_state, formality_state, anticipation_level_state, joy_level_state, trust_level_state, fear_level_state, surprise_level_state, sadness_level_state, disgust_level_state, anger_level_state, lang_level_state, translate_to_state, literary_style_state, qa_chain_state, docsearch_state, use_embeddings_state], outputs=[chatbot, history_state, video_html, my_file, audio_html, tmp_aud_file, message]) # outputs=[chatbot, history_state, message]) # outputs=[chatbot, history_state, audio_html, tmp_aud_file, message]) openai_api_key_textbox.change(set_openai_api_key, inputs=[openai_api_key_textbox], outputs=[chain_state, express_chain_state, llm_state, embeddings_state, qa_chain_state, memory_state]) block.launch(debug=True)