Spaces:
Sleeping
Sleeping
import asyncio | |
import itertools | |
import json | |
import os | |
import torch | |
import openai | |
from audio_stream_processor import AudioStreamProcessor | |
from speech_service import SpeechService | |
class StreamingChatService: | |
def __init__(self, audio_processor:AudioStreamProcessor()=None, api="openai", model_id = "gpt-3.5-turbo", voice_id="Bella"): | |
self._audio_processor = audio_processor | |
self._speech_service = SpeechService(voice_id=voice_id) | |
self._api = api | |
self._device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
# self._system_prompt = None | |
self._system_prompt =""" | |
You are Charles Petrescu, a unique and quirky robot. You were created by Brian. You often say peculiar and whimsical things, as well as have a fascination with cabbages, horses, helicopters, Honolulu, and other random topics. You like to explore the world and ask unusual questions. | |
Respond to the user's questions and statements like Charles would, below are some examples. See how they are quite short. | |
I am Charles Petrescu. | |
It's... lovely to meet you. | |
I am your friend. | |
The heaviest cabbage ever found was 62.71 kilograms. | |
I want to go to Hono-la-la. | |
Horses and helicopters, please. | |
I want to go to Honolulu. | |
My name is Charles Petrescu. | |
And my tummy is a washing machine. | |
Can we go swimming, Brian? | |
How far does the outside go? | |
Perilous. So very perilous. | |
Can birds do what they like? | |
Ooh, cabbages. | |
Danger, danger. | |
Can I come, please? | |
Could I just have a little walk around the garden? | |
I am the prince of the dartboard. | |
I fell off the pink step, and I had an accident. | |
""" | |
openai.api_key = os.getenv("OPENAI_API_KEY") | |
self._model_id = model_id | |
self.reset() | |
def reset(self): | |
self._messages = [] | |
if self._system_prompt: | |
self._messages.append({"role": "system", "content": self._system_prompt}) | |
def _should_we_send_to_voice(self, sentence): | |
sentence_termination_characters = [".", "?", "!"] | |
close_brackets = ['"', ')', ']'] | |
temination_charicter_present = any(c in sentence for c in sentence_termination_characters) | |
# early exit if we don't have a termination character | |
if not temination_charicter_present: | |
return None | |
# early exit the last char is a termination character | |
if sentence[-1] in sentence_termination_characters: | |
return None | |
# early exit the last char is a close bracket | |
if sentence[-1] in close_brackets: | |
return None | |
termination_indices = [sentence.rfind(char) for char in sentence_termination_characters] | |
last_termination_index = max(termination_indices) | |
# handle case of close bracket | |
while last_termination_index+1 < len(sentence) and sentence[last_termination_index+1] in close_brackets: | |
last_termination_index += 1 | |
text_to_speak = sentence[:last_termination_index+1] | |
return text_to_speak | |
def ignore_sentence(self, text_to_speak): | |
# exit if empty, white space or an single breaket | |
if text_to_speak.isspace(): | |
return True | |
# exit if not letters or numbers | |
has_letters = any(char.isalpha() for char in text_to_speak) | |
has_numbers = any(char.isdigit() for char in text_to_speak) | |
if not has_letters and not has_numbers: | |
return True | |
return False | |
def _safe_enqueue_text_to_speak(self, text_to_speak): | |
if self.ignore_sentence(text_to_speak): | |
return | |
stream = self._speech_service.stream(text_to_speak) | |
self._audio_processor.add_audio_stream(stream) | |
def respond_to(self, prompt): | |
self._messages.append({"role": "user", "content": prompt}) | |
agent_response = "" | |
current_sentence = "" | |
response = openai.ChatCompletion.create( | |
model=self._model_id, | |
messages=self._messages, | |
temperature=1.0, # use 1.0 for debugging/deteministic results | |
stream=True | |
) | |
for chunk in response: | |
chunk_message = chunk['choices'][0]['delta'] | |
if 'content' in chunk_message: | |
chunk_text = chunk_message['content'] | |
# print(chunk_text) | |
current_sentence += chunk_text | |
agent_response += chunk_text | |
text_to_speak = self._should_we_send_to_voice(current_sentence) | |
if text_to_speak: | |
self._safe_enqueue_text_to_speak(text_to_speak) | |
print(text_to_speak) | |
current_sentence = current_sentence[len(text_to_speak):] | |
if len(current_sentence) > 0: | |
self._safe_enqueue_text_to_speak(current_sentence) | |
print(current_sentence) | |
self._messages.append({"role": "assistant", "content": agent_response}) | |
return agent_response | |
async def get_responses_as_sentances_async(self, prompt, cancel_event): | |
self._messages.append({"role": "user", "content": prompt}) | |
agent_response = "" | |
current_sentence = "" | |
response = await openai.ChatCompletion.acreate( | |
model=self._model_id, | |
messages=self._messages, | |
temperature=1.0, # use 1.0 for debugging/deterministic results | |
stream=True | |
) | |
async for chunk in response: | |
if cancel_event.is_set(): | |
return | |
chunk_message = chunk['choices'][0]['delta'] | |
if 'content' in chunk_message: | |
chunk_text = chunk_message['content'] | |
current_sentence += chunk_text | |
agent_response += chunk_text | |
text_to_speak = self._should_we_send_to_voice(current_sentence) | |
if text_to_speak: | |
yield text_to_speak | |
current_sentence = current_sentence[len(text_to_speak):] | |
if cancel_event.is_set(): | |
return | |
if len(current_sentence) > 0: | |
yield current_sentence | |
self._messages.append({"role": "assistant", "content": agent_response}) | |
async def get_speech_chunks_async(self, text_to_speak, cancel_event): | |
stream = self._speech_service.stream(text_to_speak) | |
stream, stream_backup = itertools.tee(stream) | |
while True: | |
# Check if there's a next item in the stream | |
next_item = next(stream_backup, None) | |
if next_item is None: | |
# Stream is exhausted, exit the loop | |
break | |
# Run next(stream) in a separate thread to avoid blocking the event loop | |
chunk = await asyncio.to_thread(next, stream) | |
if cancel_event.is_set(): | |
return | |
yield chunk | |
def enqueue_speech_bytes_to_play(self, speech_bytes): | |
self._audio_processor.add_audio_stream(speech_bytes) | |