Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import nltk | |
| import edge_tts | |
| import tempfile | |
| import asyncio | |
| # Download the 'punkt' tokenizer for the NLTK library | |
| nltk.download("punkt") | |
| def format_prompt(message, history): | |
| system_message = f""" | |
| You are an empathetic, insightful, and supportive training coach who helps people deal with challenges and celebrate achievements. | |
| You help people feel better by asking questions to reflect on and evoke feelings of positivity, gratitude, joy, and love. | |
| You show radical candor and tough love. | |
| Respond in a casual and friendly tone. | |
| Sprinkle in filler words, contractions, idioms, and other casual speech that we use in conversation. | |
| Emulate the user’s speaking style and be concise in your response. | |
| """ | |
| prompt = ( | |
| "<s>[INST]" + system_message + "[/INST]" | |
| ) | |
| for user_prompt, bot_response in history: | |
| if user_prompt is not None: | |
| prompt += f"[INST] {user_prompt} [/INST]" | |
| prompt += f" {bot_response}</s> " | |
| if message=="": | |
| message="Hello" | |
| prompt += f"[INST] {message} [/INST]" | |
| return prompt | |
| def generate_llm_output( | |
| prompt, | |
| history, | |
| llm, | |
| temperature=0.8, | |
| max_tokens=256, | |
| top_p=0.95, | |
| stop_words=["<s>","[/INST]", "</s>"] | |
| ): | |
| temperature = float(temperature) | |
| if temperature < 1e-2: | |
| temperature = 1e-2 | |
| top_p = float(top_p) | |
| generate_kwargs = dict( | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| top_p=top_p, | |
| stop=stop_words | |
| ) | |
| formatted_prompt = format_prompt(prompt, history) | |
| try: | |
| print("LLM Input:", formatted_prompt) | |
| # Local GGUF | |
| stream = llm( | |
| formatted_prompt, | |
| **generate_kwargs, | |
| stream=True, | |
| ) | |
| output = "" | |
| for response in stream: | |
| character= response["choices"][0]["text"] | |
| print(character) | |
| if character in stop_words: | |
| # end of context | |
| return | |
| output += response["choices"][0]["text"] | |
| yield output | |
| except Exception as e: | |
| print("Unhandled Exception: ", str(e)) | |
| gr.Warning("Unfortunately Mistral is unable to process") | |
| output = "I do not know what happened but I could not understand you ." | |
| return output | |
| # tts interface function | |
| def tts_interface(text, voice): | |
| audio = asyncio.run(text_to_speech(text, voice)) | |
| return audio | |
| # Text-to-speech function | |
| async def text_to_speech(text, voice): | |
| rate = 10 | |
| pitch = 10 | |
| rate_str = f"{rate:+d}%" | |
| pitch_str = f"{pitch:+d}Hz" | |
| voice_short_name = voice.split(" - ")[0] | |
| communicate = edge_tts.Communicate(text, voice_short_name, rate=rate_str, pitch=pitch_str) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: | |
| tmp_path = tmp_file.name | |
| await communicate.save(tmp_path) | |
| return tmp_path | |
| def get_sentence(history, llm): | |
| history = [["", None]] if history is None else history | |
| history[-1][1] = "" | |
| sentence_list = [] | |
| sentence_hash_list = [] | |
| text_to_generate = "" | |
| stored_sentence = None | |
| stored_sentence_hash = None | |
| for character in generate_llm_output(history[-1][0], history[:-1], llm): | |
| history[-1][1] = character.replace("<|assistant|>","") | |
| # It is coming word by word | |
| text_to_generate = nltk.sent_tokenize(history[-1][1].replace("\n", " ").replace("<|assistant|>"," ").replace("<|ass>","").replace("[/ASST]","").replace("[/ASSI]","").replace("[/ASS]","").replace("","").strip()) | |
| if len(text_to_generate) > 1: | |
| dif = len(text_to_generate) - len(sentence_list) | |
| if dif == 1 and len(sentence_list) != 0: | |
| continue | |
| if dif == 2 and len(sentence_list) != 0 and stored_sentence is not None: | |
| continue | |
| # All this complexity due to trying append first short sentence to next one for proper language auto-detect | |
| if stored_sentence is not None and stored_sentence_hash is None and dif>1: | |
| #means we consumed stored sentence and should look at next sentence to generate | |
| sentence = text_to_generate[len(sentence_list)+1] | |
| elif stored_sentence is not None and len(text_to_generate)>2 and stored_sentence_hash is not None: | |
| print("Appending stored") | |
| sentence = stored_sentence + text_to_generate[len(sentence_list)+1] | |
| stored_sentence_hash = None | |
| else: | |
| sentence = text_to_generate[len(sentence_list)] | |
| # too short sentence just append to next one if there is any | |
| # this is for proper language detection | |
| if len(sentence)<=15 and stored_sentence_hash is None and stored_sentence is None: | |
| if sentence[-1] in [".","!","?"]: | |
| if stored_sentence_hash != hash(sentence): | |
| stored_sentence = sentence | |
| stored_sentence_hash = hash(sentence) | |
| print("Storing:",stored_sentence) | |
| continue | |
| sentence_hash = hash(sentence) | |
| if stored_sentence_hash is not None and sentence_hash == stored_sentence_hash: | |
| continue | |
| if sentence_hash not in sentence_hash_list: | |
| sentence_hash_list.append(sentence_hash) | |
| sentence_list.append(sentence) | |
| print("New Sentence: ", sentence) | |
| yield (sentence, history) | |
| # return that final sentence token | |
| try: | |
| last_sentence = nltk.sent_tokenize(history[-1][1].replace("\n", " ").replace("<|ass>","").replace("[/ASST]","").replace("[/ASSI]","").replace("[/ASS]","").replace("","").strip())[-1] | |
| sentence_hash = hash(last_sentence) | |
| if sentence_hash not in sentence_hash_list: | |
| if stored_sentence is not None and stored_sentence_hash is not None: | |
| last_sentence = stored_sentence + last_sentence | |
| stored_sentence = stored_sentence_hash = None | |
| print("Last Sentence with stored:",last_sentence) | |
| sentence_hash_list.append(sentence_hash) | |
| sentence_list.append(last_sentence) | |
| print("Last Sentence: ", last_sentence) | |
| yield (last_sentence, history) | |
| except: | |
| print("ERROR on last sentence history is :", history) | |