project_charles / chat_service.py
sohojoe's picture
move prompt engineering and history. WIP: asyncio RespondToPrompt
361f9d4
raw
history blame
4.53 kB
import asyncio
import itertools
import json
import os
import torch
import openai
class ChatService:
def __init__(self, api="openai", model_id = "gpt-3.5-turbo"):
self._api = api
self._device = "cuda:0" if torch.cuda.is_available() else "cpu"
openai.api_key = os.getenv("OPENAI_API_KEY")
self._model_id = model_id
def _should_we_send_to_voice(self, sentence):
sentence_termination_characters = [".", "?", "!"]
close_brackets = ['"', ')', ']']
temination_charicter_present = any(c in sentence for c in sentence_termination_characters)
# early exit if we don't have a termination character
if not temination_charicter_present:
return None
# early exit the last char is a termination character
if sentence[-1] in sentence_termination_characters:
return None
# early exit the last char is a close bracket
if sentence[-1] in close_brackets:
return None
termination_indices = [sentence.rfind(char) for char in sentence_termination_characters]
# Filter out termination indices that are not followed by whitespace or end of string
termination_indices = [i for i in termination_indices if sentence[i+1].isspace()]
last_termination_index = max(termination_indices)
# handle case of close bracket
while last_termination_index+1 < len(sentence) and sentence[last_termination_index+1] in close_brackets:
last_termination_index += 1
text_to_speak = sentence[:last_termination_index+1]
return text_to_speak
def ignore_sentence(self, text_to_speak):
# exit if empty, white space or an single breaket
if text_to_speak.isspace():
return True
# exit if not letters or numbers
has_letters = any(char.isalpha() for char in text_to_speak)
has_numbers = any(char.isdigit() for char in text_to_speak)
if not has_letters and not has_numbers:
return True
return False
async def get_responses_as_sentances_async(self, messages, cancel_event=None):
llm_response = ""
current_sentence = ""
delay = 0.1
while True:
try:
response = await openai.ChatCompletion.acreate(
model=self._model_id,
messages=messages,
temperature=1.0, # use 0 for debugging/more deterministic results
stream=True
)
async for chunk in response:
if cancel_event is not None and cancel_event.is_set():
return
chunk_message = chunk['choices'][0]['delta']
if 'content' in chunk_message:
chunk_text = chunk_message['content']
current_sentence += chunk_text
llm_response += chunk_text
text_to_speak = self._should_we_send_to_voice(current_sentence)
if text_to_speak:
current_sentence = current_sentence[len(text_to_speak):]
yield text_to_speak, True
else:
yield current_sentence, False
if cancel_event is not None and cancel_event.is_set():
return
if len(current_sentence) > 0:
yield current_sentence, True
return
except openai.error.APIError as e:
print(f"OpenAI API returned an API Error: {e}")
print(f"Retrying in {delay} seconds...")
await asyncio.sleep(delay)
delay *= 2
except openai.error.APIConnectionError as e:
print(f"Failed to connect to OpenAI API: {e}")
print(f"Retrying in {delay} seconds...")
await asyncio.sleep(delay)
delay *= 2
except openai.error.RateLimitError as e:
print(f"OpenAI API request exceeded rate limit: {e}")
print(f"Retrying in {delay} seconds...")
await asyncio.sleep(delay)
delay *= 2
except Exception as e:
print(f"OpenAI API unknown error: {e}")
print(f"Retrying in {delay} seconds...")
await asyncio.sleep(delay)
delay *= 2