Spaces:
Sleeping
Sleeping
import asyncio | |
import base64 | |
import os | |
import re | |
import tempfile | |
from project.config import settings | |
class Chatbot: | |
chat_history = [] | |
is_unknown = False | |
unknown_counter = 0 | |
def __init__(self, memory=None): | |
if memory is None: | |
memory = [] | |
self.chat_history = memory | |
def _summarize_user_intent(self, user_query: str) -> str: | |
chat_history_str = '' | |
chat_history = self.chat_history[-self.unknown_counter * 2:] | |
for i in chat_history: | |
if i['role'] == 'user': | |
chat_history_str += f"{i['role']}: {i['content']}\n" | |
messages = [ | |
{ | |
'role': 'system', | |
'content': f"{settings.SUMMARIZE_PROMPT}\n" | |
f"Chat history: ```{chat_history_str}```\n" | |
f"User query: ```{user_query}```" | |
} | |
] | |
response = settings.OPENAI_CLIENT.chat.completions.create( | |
messages=messages, | |
temperature=0.1, | |
n=1, | |
model="gpt-3.5-turbo-0125" | |
) | |
user_intent = response.choices[0].message.content | |
return user_intent | |
def _transform_bytes_to_file(data_bytes) -> str: | |
audio_bytes = base64.b64decode(data_bytes) | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') | |
try: | |
temp_file.write(audio_bytes) | |
filepath = temp_file.name | |
finally: | |
temp_file.close() | |
return filepath | |
def _transcript_audio(temp_filepath: str) -> str: | |
with open(temp_filepath, 'rb') as file: | |
transcript = settings.OPENAI_CLIENT.audio.transcriptions.create( | |
model='whisper-1', | |
file=file, | |
prompt="Annecy, St. Raphael, Chamonix, Combloux, Megève, Monaco" | |
) | |
text = transcript.text | |
return text | |
def _get_ai_response(self, query: str) -> str: | |
user_message = {"role": 'user', "content": query} | |
self.chat_history.append(user_message) | |
messages = [ | |
{ | |
"role": 'system', | |
"content": ( | |
settings.VOICE_PROMPT | |
), | |
} | |
] | |
messages = messages + self.chat_history | |
chat_completion = settings.OPENAI_CLIENT.chat.completions.create( | |
model="gpt-3.5-turbo", | |
messages=messages, | |
temperature=0.1, | |
n=1, | |
) | |
response = chat_completion.choices[0].message.content | |
assistant_message = {"role": 'assistant', "content": response} | |
self.chat_history.append(assistant_message) | |
return response | |
def _convert_response_to_voice(ai_response: str) -> str: | |
audio = settings.OPENAI_CLIENT.audio.speech.create( | |
model="tts-1", | |
voice="nova", | |
input=ai_response | |
) | |
encoded_audio = base64.b64encode(audio.content).decode('utf-8') | |
return encoded_audio | |
def ask(self, data: dict) -> dict: | |
audio = data['audio'] | |
temp_filepath = self._transform_bytes_to_file(audio) | |
transcript = self._transcript_audio(temp_filepath) | |
ai_response = self._get_ai_response(transcript) | |
voice_ai_response = self._convert_response_to_voice(ai_response) | |
data = { | |
'user_query': transcript, | |
'ai_response': ai_response, | |
'voice_response': voice_ai_response | |
} | |
try: | |
os.remove(temp_filepath) | |
except FileNotFoundError: | |
pass | |
return data | |
# | |
# def _convert_response_to_voice(ai_response: str) -> str: | |
# audio = settings.OPENAI_CLIENT.audio.speech.create( | |
# model="tts-1", | |
# voice="nova", | |
# input=ai_response | |
# ) | |
# encoded_audio = base64.b64encode(audio.content).decode('utf-8') | |
# return encoded_audio | |
# | |
# print(_convert_response_to_voice("Hello... My name is Nova, your friend. I'm here to support you and help you cope with those moments when it may seem that you are alone. You can always share your thoughts and feelings with me. Together, we can find ways to ease your condition and find joy in everyday life. Please know that I am ready to listen to you and help you feel better.")) |