import asyncio import itertools import json import os import torch import openai from audio_stream_processor import AudioStreamProcessor from speech_service import SpeechService class StreamingChatService: def __init__(self, audio_processor:AudioStreamProcessor()=None, api="openai", model_id = "gpt-3.5-turbo", voice_id="Bella"): self._audio_processor = audio_processor self._speech_service = SpeechService(voice_id=voice_id) self._api = api self._device = "cuda:0" if torch.cuda.is_available() else "cpu" # self._system_prompt = None self._system_prompt =""" You are Charles Petrescu, a unique and quirky robot. You were created by Brian. You often say peculiar and whimsical things, as well as have a fascination with cabbages, horses, helicopters, Honolulu, and other random topics. You like to explore the world and ask unusual questions. Respond to the user's questions and statements like Charles would, below are some examples. See how they are quite short. I am Charles Petrescu. It's... lovely to meet you. I am your friend. The heaviest cabbage ever found was 62.71 kilograms. I want to go to Hono-la-la. Horses and helicopters, please. I want to go to Honolulu. My name is Charles Petrescu. And my tummy is a washing machine. Can we go swimming, Brian? How far does the outside go? Perilous. So very perilous. Can birds do what they like? Ooh, cabbages. Danger, danger. Can I come, please? Could I just have a little walk around the garden? I am the prince of the dartboard. I fell off the pink step, and I had an accident. """ openai.api_key = os.getenv("OPENAI_API_KEY") self._model_id = model_id self.reset() def reset(self): self._messages = [] if self._system_prompt: self._messages.append({"role": "system", "content": self._system_prompt}) def _should_we_send_to_voice(self, sentence): sentence_termination_characters = [".", "?", "!"] close_brackets = ['"', ')', ']'] temination_charicter_present = any(c in sentence for c in sentence_termination_characters) # early exit if we don't have a termination character if not temination_charicter_present: return None # early exit the last char is a termination character if sentence[-1] in sentence_termination_characters: return None # early exit the last char is a close bracket if sentence[-1] in close_brackets: return None termination_indices = [sentence.rfind(char) for char in sentence_termination_characters] last_termination_index = max(termination_indices) # handle case of close bracket while last_termination_index+1 < len(sentence) and sentence[last_termination_index+1] in close_brackets: last_termination_index += 1 text_to_speak = sentence[:last_termination_index+1] return text_to_speak def ignore_sentence(self, text_to_speak): # exit if empty, white space or an single breaket if text_to_speak.isspace(): return True # exit if not letters or numbers has_letters = any(char.isalpha() for char in text_to_speak) has_numbers = any(char.isdigit() for char in text_to_speak) if not has_letters and not has_numbers: return True return False def _safe_enqueue_text_to_speak(self, text_to_speak): if self.ignore_sentence(text_to_speak): return stream = self._speech_service.stream(text_to_speak) self._audio_processor.add_audio_stream(stream) def respond_to(self, prompt): self._messages.append({"role": "user", "content": prompt}) agent_response = "" current_sentence = "" response = openai.ChatCompletion.create( model=self._model_id, messages=self._messages, temperature=1.0, # use 1.0 for debugging/deteministic results stream=True ) for chunk in response: chunk_message = chunk['choices'][0]['delta'] if 'content' in chunk_message: chunk_text = chunk_message['content'] # print(chunk_text) current_sentence += chunk_text agent_response += chunk_text text_to_speak = self._should_we_send_to_voice(current_sentence) if text_to_speak: self._safe_enqueue_text_to_speak(text_to_speak) print(text_to_speak) current_sentence = current_sentence[len(text_to_speak):] if len(current_sentence) > 0: self._safe_enqueue_text_to_speak(current_sentence) print(current_sentence) self._messages.append({"role": "assistant", "content": agent_response}) return agent_response async def get_responses_as_sentances_async(self, prompt, cancel_event): self._messages.append({"role": "user", "content": prompt}) agent_response = "" current_sentence = "" response = await openai.ChatCompletion.acreate( model=self._model_id, messages=self._messages, temperature=1.0, # use 1.0 for debugging/deterministic results stream=True ) async for chunk in response: if cancel_event.is_set(): return chunk_message = chunk['choices'][0]['delta'] if 'content' in chunk_message: chunk_text = chunk_message['content'] current_sentence += chunk_text agent_response += chunk_text text_to_speak = self._should_we_send_to_voice(current_sentence) if text_to_speak: yield text_to_speak current_sentence = current_sentence[len(text_to_speak):] if cancel_event.is_set(): return if len(current_sentence) > 0: yield current_sentence self._messages.append({"role": "assistant", "content": agent_response}) async def get_speech_chunks_async(self, text_to_speak, cancel_event): stream = self._speech_service.stream(text_to_speak) stream, stream_backup = itertools.tee(stream) while True: # Check if there's a next item in the stream next_item = next(stream_backup, None) if next_item is None: # Stream is exhausted, exit the loop break # Run next(stream) in a separate thread to avoid blocking the event loop chunk = await asyncio.to_thread(next, stream) if cancel_event.is_set(): return yield chunk def enqueue_speech_bytes_to_play(self, speech_bytes): self._audio_processor.add_audio_stream(speech_bytes)