Spaces:
Runtime error
Runtime error
import csv | |
import uuid | |
import pinecone | |
from typing import Union | |
from openai import Client | |
from pinecone import Index | |
from gcp import * | |
import audio_model | |
if not os.path.exists('tts_model'): # Get TTS model | |
audio_model.download_model() | |
pass | |
from audio import * | |
from video import * | |
pinecone.init(api_key=os.getenv('PINECONE_API_KEY'), environment=os.getenv('PINECONE_ENV')) | |
INDEX = Index(os.getenv('PINECONE_INDEX')) | |
OPENAI_CLIENT = Client() | |
def add_data_table(table: list[list[str]], *data: str): | |
""" | |
Adds the data to the table. Some data consist of two columns others only one. | |
So depending on that, the new row and returned value will be different- | |
""" | |
if len(data) == 3: # It is the greet tab | |
new_value = '', *data[1:] | |
elif data[-1] in ['español', 'ingles', 'portugués']: | |
new_value = '', data[-1] | |
else: | |
new_value = '', '' | |
# The table is empty, do not append it but replace the first row | |
if all(column == '' for column in table[0]): | |
table[0] = ['❌', *data] | |
# Add the new data | |
else: | |
table.append(['❌', *data]) | |
return table, *new_value | |
def remove_data_table(table: list[list[str]], evt: gr.SelectData): | |
""" | |
Deletes a row on the table if the selected column is the first one | |
""" | |
# The clicked column is not the first one (the one with the X), do not do anything | |
if evt.index[1] != 0: | |
return table | |
# The list only has one row, do not delete it, just put the default one | |
if len(table) == 1: | |
table[0] = ['' for _ in range(len(table[0]))] | |
# Delete the row | |
else: | |
del table[evt.index[0]] | |
return table | |
def add_language(languages: list[str]) -> Union[gr.Error, tuple[gr.helpers, gr.helpers]]: | |
if len(languages) == 0: | |
raise gr.Error('Debe seleccionar al menos 1 idioma') | |
return ( | |
gr.update(choices=[i for i in languages], value=languages[0], interactive=True), | |
gr.update(choices=[i for i in languages], value=languages[0], interactive=True) | |
) | |
def create_chatbot( | |
client: str, name: str, messages_table: list[str, ], random_table, questions_table, | |
): | |
translate_language = {'español': 'es', 'ingles': 'en', 'portugués': 'pt'} | |
translate_greet = {'Saludo': 'greeting', 'Despedida': 'goodbye', 'Error': 'error'} | |
# Set up general info | |
client_name = client.lower().replace(' ', '-') | |
chatbot_name = name.lower() | |
# Group messages by their type (greeting, goodbye or error) and language | |
messages = dict() | |
for message in messages_table: | |
type_msg = translate_greet[message[1]] | |
language_msg = translate_language[message[-1]] | |
os.makedirs(f'assets/{client_name}/{type_msg}s', exist_ok=True) | |
if type_msg not in messages: | |
messages[type_msg] = {language_msg: [message[2]]} | |
else: | |
if language_msg not in messages[type_msg]: | |
messages[type_msg][language_msg] = [message[2]] | |
else: | |
messages[type_msg][language_msg].append(message[2]) | |
# Create CSV files (greeting, goodbye and error) | |
for type_msg in messages: | |
for language in messages[type_msg]: | |
with open(f'assets/{client_name}/{type_msg}/{language}.csv', mode='w', encoding='utf-8') as outfile: | |
writer = csv.writer(outfile, delimiter=',') | |
writer.writerows(messages[type_msg][language]) | |
# Create the audios (greeting, goodbye and error) | |
os.makedirs(f'assets/{client_name}/media/audio', exist_ok=True) | |
for type_msg in messages: | |
for language in messages[type_msg]: | |
for i, msg in enumerate(messages[type_msg][language]): | |
full_path = f'assets/{client_name}/media/audio/{type_msg}_{language}_{i}.wav' | |
# get_audio(msg, language, full_path) | |
# Create the random audios | |
for i, (_, msg, language) in enumerate(random_table): | |
full_path = f'assets/{client_name}/media/audio/random_{language}_{i}.wav' | |
# get_audio(msg, language, full_path) | |
# Upload files and audios to bucket in GCP | |
upload_folder('clients-bella', f'assets/{client_name}') | |
# Create videos | |
os.makedirs(f'assets/{client_name}/media/video', exist_ok=True) | |
for audio_file in os.listdir(f'assets/{client_name}/media/audio'): | |
name_file = audio_file.split('.')[0] | |
link_audio = get_link_file('clients-bella', client_name, 'audio', audio_file) | |
get_video(link_audio, f'assets/{client_name}/media/audio/{name_file}.mp4') | |
# Upload videos to GCP | |
upload_folder('clients-bella', f'assets/{client_name}/media/video') | |
# Set up vectorstore | |
vectors = [] | |
for _, question, context in questions_table: | |
vector = { | |
"id": str(uuid.uuid4()), | |
"values": _get_embedding(question), | |
"metadata": {'Text': context}, | |
} | |
vectors.append(vector) | |
INDEX.upsert(vectors=vectors, namespace=f'{client_name}-context') | |
# Change text in the button | |
return gr.Button(value='Chatbot created!!!', interactive=True) | |
def _get_embedding(sentence: str) -> list[float]: | |
""" | |
Returns the embedding of a sentence | |
:param sentence: input of the model | |
:return: list of floats representing the embedding | |
""" | |
response = OPENAI_CLIENT.embeddings.create( | |
input=sentence, | |
model='text-embedding-ada-002' | |
) | |
return response.data[0].embedding | |