Spaces:
Sleeping
Sleeping
import os | |
import re | |
import requests | |
import json | |
import gradio as gr | |
from langchain.chat_models import ChatOpenAI | |
from langchain import LLMChain, PromptTemplate | |
from langchain.memory import ConversationBufferMemory | |
OPENAI_API_KEY=os.getenv('OPENAI_API_KEY') | |
PLAY_HT_API_KEY=os.getenv('PLAY_HT_API_KEY') | |
PLAY_HT_USER_ID=os.getenv('PLAY_HT_USER_ID') | |
PLAY_HT_VOICE_ID=os.getenv('PLAY_HT_VOICE_ID') | |
play_ht_api_get_audio_url = "https://play.ht/api/v2/tts" | |
template = """You are a helpful assistant to answer user queries. | |
{chat_history} | |
User: {user_message} | |
Chatbot:""" | |
prompt = PromptTemplate( | |
input_variables=["chat_history", "user_message"], template=template | |
) | |
memory = ConversationBufferMemory(memory_key="chat_history") | |
llm_chain = LLMChain( | |
llm=ChatOpenAI(temperature='0.5', model_name="gpt-3.5-turbo"), | |
prompt=prompt, | |
verbose=True, | |
memory=memory, | |
) | |
headers = { | |
"accept": "text/event-stream", | |
"content-type": "application/json", | |
"AUTHORIZATION": "Bearer "+ PLAY_HT_API_KEY, | |
"X-USER-ID": PLAY_HT_USER_ID | |
} | |
def get_payload(text): | |
return { | |
"text": text, | |
"voice": PLAY_HT_VOICE_ID, | |
"quality": "medium", | |
"output_format": "mp3", | |
"speed": 1, | |
"sample_rate": 24000, | |
"seed": None, | |
"temperature": None | |
} | |
def get_generated_audio(text): | |
payload = get_payload(text) | |
generated_response = {} | |
try: | |
response = requests.post(play_ht_api_get_audio_url, json=payload, headers=headers) | |
response.raise_for_status() | |
generated_response["type"]= 'SUCCESS' | |
generated_response["response"] = response.text | |
except requests.exceptions.RequestException as e: | |
generated_response["type"]= 'ERROR' | |
try: | |
response_text = json.loads(response.text) | |
if response_text['error_message']: | |
generated_response["response"] = response_text['error_message'] | |
else: | |
generated_response["response"] = response.text | |
except Exception as e: | |
generated_response["response"] = response.text | |
except Exception as e: | |
generated_response["type"]= 'ERROR' | |
generated_response["response"] = response.text | |
return generated_response | |
def extract_urls(text): | |
# Define the regex pattern for URLs | |
url_pattern = r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+[/\w\.-]*' | |
# Find all occurrences of URLs in the text | |
urls = re.findall(url_pattern, text) | |
return urls | |
def get_audio_reply_for_question(text): | |
generated_audio_event = get_generated_audio(text) | |
#From get_generated_audio, you will get events in a string format, from that we need to extract the url | |
final_response = { | |
"audio_url": '', | |
"message": '' | |
} | |
if generated_audio_event["type"] == 'SUCCESS': | |
audio_urls = extract_urls(generated_audio_event["response"]) | |
if len(audio_urls) == 0: | |
final_response['message'] = "No audio file link found in generated event" | |
else: | |
final_response['audio_url'] = audio_urls[-1] | |
else: | |
final_response['message'] = generated_audio_event['response'] | |
return final_response | |
def download_url(url): | |
try: | |
# Send a GET request to the URL to fetch the content | |
final_response = { | |
'content':'', | |
'error':'' | |
} | |
response = requests.get(url) | |
# Check if the request was successful (status code 200) | |
if response.status_code == 200: | |
final_response['content'] = response.content | |
else: | |
final_response['error'] = f"Failed to download the URL. Status code: {response.status_code}" | |
except Exception as e: | |
final_response['error'] = f"Failed to download the URL. Error: {e}" | |
return final_response | |
def get_filename_from_url(url): | |
# Use os.path.basename() to extract the file name from the URL | |
file_name = os.path.basename(url) | |
return file_name | |
def get_text_response(user_message): | |
response = llm_chain.predict(user_message = user_message) | |
return response | |
def get_text_response_and_audio_response(user_message): | |
response = get_text_response(user_message) # Getting the reply from Open AI | |
audio_reply_for_question_response = get_audio_reply_for_question(response) | |
final_response = { | |
'output_file_path': '', | |
'message':'' | |
} | |
audio_url = audio_reply_for_question_response['audio_url'] | |
if audio_url: | |
output_file_path=get_filename_from_url(audio_url) | |
download_url_response = download_url(audio_url) | |
audio_content = download_url_response['content'] | |
if audio_content: | |
with open(output_file_path, "wb") as audio_file: | |
audio_file.write(audio_content) | |
final_response['output_file_path'] = output_file_path | |
else: | |
final_response['message'] = download_url_response['error'] | |
else: | |
final_response['message'] = audio_reply_for_question_response['message'] | |
return final_response | |
def chat_bot_response(message, history): | |
text_and_audio_response = get_text_response_and_audio_response(message) | |
output_file_path = text_and_audio_response['output_file_path'] | |
if output_file_path: | |
return (text_and_audio_response['output_file_path'],) | |
else: | |
return text_and_audio_response['message'] | |
demo = gr.ChatInterface(chat_bot_response,examples=["How are you doing?","What are your interests?","Which places do you like to visit?"]) | |
if __name__ == "__main__": | |
demo.launch() #To create a public link, set `share=True` in `launch()`. To enable errors and logs, set `debug=True` in `launch()`. | |