File size: 10,365 Bytes
e0d9c8e
 
 
 
 
 
 
 
 
 
 
 
 
 
4df6e8a
e0d9c8e
 
f1e32a6
e0d9c8e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
797a248
e0d9c8e
 
797a248
e0d9c8e
 
 
 
797a248
 
e0d9c8e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
797a248
e0d9c8e
 
797a248
e0d9c8e
 
 
 
 
797a248
 
 
 
 
e0d9c8e
 
797a248
e0d9c8e
797a248
 
e0d9c8e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
import os
import csv
import uuid
import json
import logging
import pinecone
import gradio as gr
from PIL import Image
from typing import Union
from openai import Client
from pinecone import Index

from services import audio_model, gcp

if not os.path.exists('tts_model'):  # Get TTS model
    audio_model.download_model()
from services.audio import *
from services.video import *


pinecone.init(api_key=os.getenv('PINECONE_API_KEY'), environment=os.getenv('PINECONE_ENV'))
INDEX = Index(os.getenv('PINECONE_INDEX'))
OPENAI_CLIENT = Client()
TRANSLATE_LANGUAGES = {'español': 'es', 'ingles': 'en', 'portugués': 'pt'}
TRANSLATE_GREET = {'Saludo': 'greeting', 'Despedida': 'goodbye', 'Error': 'error'}


def add_data_table(table: list[list[str]], *data: str) -> tuple[list[list[str]], list[str]]:
    """
    Adds the data to the table. Some data consist of two columns others only one.
    So depending on that, the new row and returned value will be different.
    :param table: table to add the data to
    :param data: new row to be added to the table
    :return: updated table and list of strings for cleaning the input
    """
    if len(data) == 3:  # It is the greet tab
        new_value = '', *data[1:]
    elif data[-1] in ['español', 'ingles', 'portugués']:
        new_value = '', data[-1]
    else:
        new_value = '', ''

    # The table is empty, do not append it but replace the first row
    if all(column == '' for column in table[0]):
        table[0] = ['❌', *data]

    # Add the new data
    else:
        table.append(['❌', *data])

    return table, *new_value


def remove_data_table(table: list[list[str]], evt: gr.SelectData) -> list[list[str]]:
    """
    Deletes a row on the table if the selected column is the first one.
    :param table: clicked table
    :param evt: the event (has info of the position of the click)
    :return: updated table
    """
    # The clicked column is not the first one (the one with the X), do not do anything
    if evt.index[1] != 0:
        return table

    # The list only has one row, do not delete it, just put the default one
    if len(table) == 1:
        table[0] = ['' for _ in range(len(table[0]))]

    # Delete the row
    else:
        del table[evt.index[0]]
    return table


def add_language(languages: list[str]) -> Union[gr.Error, tuple[gr.helpers, gr.helpers, gr.helpers]]:
    """
    Updated the dropdown with the selected languages
    :param languages: list of selected languages
    :return: three updated dropdowns if at least 1 language was selected, otherwise an error
    """
    if len(languages) == 0:
        raise gr.Error('Debe seleccionar al menos 1 idioma')

    return (
        gr.update(choices=[i for i in languages], value=languages[0], interactive=True),
        gr.update(choices=[i for i in languages], value=languages[0], interactive=True),
        gr.update(choices=[i for i in languages], value=languages[0], interactive=True)
    )


def create_chatbot(
        client: str, name: str, messages_table: list[list[str]], random_table: list[list[str]],
        questions_table: list[list[str]], image: Image
) -> gr.helpers:
    """
    Creation of the chatbot. It creates all the audios, videos csv files for the given tables
    (greetings, goodbyes, errors and random) and uploads them to GCP, and it creates the
    vectorstore with the given questions and answers.
    :param client: name of the client (Nosotras, Visit Orlando, etc.)
    :param name: name of the chatbot (Bella, Roomie, etc.)
    :param messages_table: table with the greetings, goodbyes and errors messages
    :param random_table: table with the random data about the client
    :param questions_table: table with the questions and answers for each question
    :param image: image used as base for the videos
    :return: updates the value of a button (know lets know the user if the process is done or there was an error)
    """
    # Set up general info
    client_name = client.lower().replace(' ', '-')
    _ = name.lower()  # TODO: use it

    # Group messages by their type (greeting, goodbye or error) and language
    messages = dict()
    for message in messages_table:
        msg = message[1]
        type_msg = TRANSLATE_GREET[message[2]]
        language_msg = TRANSLATE_LANGUAGES[message[-1]]
        os.makedirs(f'assets/{client_name}/{type_msg}s', exist_ok=True)
        if type_msg not in messages:
            messages[type_msg] = {language_msg: [msg]}
        else:
            if language_msg not in messages[type_msg]:
                messages[type_msg][language_msg] = [msg]
            else:
                messages[type_msg][language_msg].append(msg)

    # Create CSV files (greeting, goodbye and error)
    for type_msg in messages:
        for language in messages[type_msg]:
            with (open(f'assets/{client_name}/{type_msg}s/{language}.csv', mode='w', encoding='utf-8', newline='')
                  as outfile):
                writer = csv.writer(outfile)
                for msg in messages[type_msg][language]:
                    writer.writerow([msg])

    # Create the audios (greeting, goodbye and error)
    path_audios = f'assets/{client_name}/media/audio'
    os.makedirs(path_audios, exist_ok=True)
    for type_msg in messages:
        for language in messages[type_msg]:
            for i, msg in enumerate(messages[type_msg][language]):
                full_path = f'{path_audios}/{type_msg}_{language}_{i}'
                get_audio(msg, language, full_path)

    # Group random audios by their language
    random = dict()
    for _, msg, language in random_table:
        short_language = TRANSLATE_LANGUAGES[language]
        if short_language not in random:
            random[short_language] = [msg]
        else:
            random[short_language].append(msg)

    # Create the random audios
    for language in random:
        for i, msg in enumerate(random[language]):
            full_path = f'{path_audios}/random_{language}_{i}'
            get_audio(msg, language, full_path)

    # Save image
    os.makedirs(f'assets/{client_name}/media/image', exist_ok=True)
    image.save(f'assets/{client_name}/media/image/base.png')

    # Upload files and audios to bucket in GCP
    gcp.upload_folder(client_name, f'assets/{client_name}')

    # Create videos for the generated audios and the waiting video (it is muted)
    path_videos = f'assets/{client_name}/media/video'
    os.makedirs(path_videos, exist_ok=True)
    list_audios = os.listdir(path_audios) + ['waiting.wav']
    for audio_file in list_audios:
        name_file = audio_file.split('.')[0]
        link_audio = gcp.get_link_file(client_name, 'audio', audio_file)
        link_image = gcp.get_link_file(client_name, 'image', 'base.png')
        try:
            get_video(link_audio, link_image, f'{path_videos}/{name_file}')
        except Exception as e:
            gr.Error(f'Problema con la creación del video, hable con el administrador. Error: {e}')
            logging.error(e)
            return gr.update(value='ERROR!', interactive=False)

    # Upload videos to GCP
    gcp.upload_folder(client_name, path_videos)

    # Set up vectorstore
    vectors = []
    for _, question, context in questions_table:
        vector = {
            "id": str(uuid.uuid4()),
            "values": _get_embedding(question),
            "metadata": {'Text': context},
        }
        vectors.append(vector)
    INDEX.upsert(vectors=vectors, namespace=f'{client_name}-context')

    # Change text in the button
    return gr.update(value='Chatbot created!!!', interactive=False)


def save_prompts(client: str, context_prompt: str, prompts_table: list[list[str]]) -> None:
    """
    Saves all the prompts (standalone and one for each language) and uploads them to Google Cloud Storage
    :param client: name of the client
    :param context_prompt: standalone prompt used to search into the vectorstore
    :param prompts_table: table with the prompt of each language
    :return: None
    """
    client_name = client.lower().replace(' ', '-')

    path_prompts = f'assets/{client_name}/prompts'
    os.makedirs(path_prompts, exist_ok=True)

    # Save standalone prompt. It is the same for all languages
    with open(f'{path_prompts}/prompt_standalone_q.txt', mode='w', encoding='utf-8') as outfile:
        outfile.write(context_prompt)

    # Save the prompt of each language
    for _, prompt, language in prompts_table:
        language_prompt = TRANSLATE_LANGUAGES[language]
        with open(f'{path_prompts}/prompt_{language_prompt}.txt', mode='w', encoding='utf-8') as outfile:
            outfile.write(prompt)

    gcp.upload_folder(client_name, path_prompts)
    return


def generate_json(client: str, languages: list[str], max_num_questions: int, chatbot_name: str) -> gr.helpers:
    """
    Creates a json file with the environment variables used in the API
    :param client:
    :param languages:
    :param max_num_questions:
    :param chatbot_name:
    :return: gradio file with the value as the path of the json file
    """
    # Format the name and the languages
    short_languages = ''.join(f'{TRANSLATE_LANGUAGES[language]},' for language in languages)
    short_languages = short_languages[:-1]
    client_name = client.lower().replace(' ', '-')

    json_object = json.dumps(
        {
            'CLIENT_NAME': client_name, 'MODEL_OPENAI': os.getenv('OPENAI_MODEL'), 'LANGUAGES': short_languages,
            'MAX_NUM_QUESTIONS': max_num_questions, 'NUM_VECTORS_CONTEXT': 10, 'THRESHOLD_RECYCLE': 0.97,
            'OPENAI_API_KEY': 'Check OpenAI for this', 'CHATBOT_NAME': chatbot_name, 'HAS_ROADMAP': 0,
            'SAVE_ANSWERS': 0, 'USE_RECYCLED_DATA': 1
        },
        indent=4
    )
    path_json = f"assets/{client_name}/chatbot_variables.json"
    with open(path_json, mode='w', encoding='utf-8') as outfile:
        outfile.write(json_object)

    return gr.update(value=path_json, label='Output file', interactive=True)


def _get_embedding(sentence: str) -> list[float]:
    """
    Gets the embedding of a word/sentence/paragraph
    :param sentence: input of the model
    :return: list of floats representing the embedding
    """
    response = OPENAI_CLIENT.embeddings.create(
        input=sentence,
        model='text-embedding-ada-002'
    )
    return response.data[0].embedding