Spaces:
Sleeping
Sleeping
import json | |
import os | |
import torch | |
import openai | |
from audio_stream_processor import AudioStreamProcessor | |
from speech_service import SpeechService | |
class StreamingChatService: | |
def __init__(self, audio_processor:AudioStreamProcessor()=None, api="openai", model_id = "gpt-3.5-turbo", voice_id="Bella"): | |
self._audio_processor = audio_processor | |
self._speech_service = SpeechService(voice_id=voice_id) | |
self._api = api | |
self._device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
self._system_prompt = None | |
openai.api_key = os.getenv("OPENAI_API_KEY") | |
self._model_id = model_id | |
self.reset() | |
def reset(self): | |
self._messages = [] | |
if self._system_prompt: | |
self._messages.append({"role": "system", "content": self._system_prompt}) | |
def _should_we_send_to_voice(self, sentence): | |
sentence_termination_characters = [".", "?", "!"] | |
close_brackets = ['"', ')', ']'] | |
temination_charicter_present = any(c in sentence for c in sentence_termination_characters) | |
# early exit if we don't have a termination character | |
if not temination_charicter_present: | |
return None | |
# early exit the last char is a termination character | |
if sentence[-1] in sentence_termination_characters: | |
return None | |
# early exit the last char is a close bracket | |
if sentence[-1] in close_brackets: | |
return None | |
termination_indices = [sentence.rfind(char) for char in sentence_termination_characters] | |
last_termination_index = max(termination_indices) | |
# handle case of close bracket | |
while last_termination_index+1 < len(sentence) and sentence[last_termination_index+1] in close_brackets: | |
last_termination_index += 1 | |
text_to_speak = sentence[:last_termination_index+1] | |
return text_to_speak | |
def respond_to(self, prompt): | |
self._messages.append({"role": "user", "content": prompt}) | |
agent_response = "" | |
current_sentence = "" | |
response = openai.ChatCompletion.create( | |
model=self._model_id, | |
messages=self._messages, | |
temperature=1.0, # use 1.0 for debugging/deteministic results | |
stream=True | |
) | |
for chunk in response: | |
chunk_message = chunk['choices'][0]['delta'] | |
if 'content' in chunk_message: | |
chunk_text = chunk_message['content'] | |
# print(chunk_text) | |
current_sentence += chunk_text | |
agent_response += chunk_text | |
text_to_speak = self._should_we_send_to_voice(current_sentence) | |
if text_to_speak: | |
stream = self._speech_service.stream(text_to_speak) | |
self._audio_processor.add_audio_stream(stream) | |
print(text_to_speak) | |
current_sentence = current_sentence[len(text_to_speak):] | |
if len(current_sentence) > 0: | |
stream = self._speech_service.stream(current_sentence) | |
self._audio_processor.add_audio_stream(stream) | |
print(current_sentence) | |
self._messages.append({"role": "assistant", "content": agent_response}) | |
return agent_response | |