|
import pandas as pd |
|
import numpy as np |
|
from sklearn.feature_extraction.text import CountVectorizer |
|
from sklearn.tree import DecisionTreeClassifier |
|
from sklearn.model_selection import train_test_split |
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
from sklearn.neural_network import MLPClassifier |
|
from sklearn.ensemble import VotingClassifier |
|
from tensorflow.keras.models import Sequential |
|
from tensorflow.keras.layers import Embedding, LSTM, Dense |
|
from tensorflow.keras.preprocessing.text import Tokenizer |
|
from tensorflow.keras.preprocessing.sequence import pad_sequences |
|
import tensorflow as tf |
|
from joblib import dump |
|
from joblib import load |
|
from google.colab import drive |
|
import gdown |
|
from pydrive.auth import GoogleAuth |
|
from pydrive.drive import GoogleDrive |
|
import pickle |
|
import gradio as gr |
|
|
|
|
|
def train_creative_model(text_data): |
|
if not text_data or len(text_data) == 0: |
|
print("No hay suficientes datos para entrenar el modelo creativo.") |
|
return None, None, None |
|
|
|
tokenizer = Tokenizer() |
|
tokenizer.fit_on_texts(text_data) |
|
total_words = len(tokenizer.word_index) + 1 |
|
|
|
input_sequences = [] |
|
for line in text_data: |
|
tokens = line.split('\t') |
|
for token in tokens: |
|
token_list = tokenizer.texts_to_sequences([token])[0] |
|
for i in range(len(token_list)): |
|
n_gram_sequence = token_list[i] |
|
input_sequences.append(n_gram_sequence) |
|
|
|
if not input_sequences or len(input_sequences) == 0: |
|
print("No hay suficientes secuencias para entrenar el modelo creativo.") |
|
return None, None, None |
|
|
|
X = np.array(input_sequences) |
|
y = tf.keras.utils.to_categorical(X, num_classes=total_words) |
|
|
|
model = Sequential() |
|
model.add(Embedding(total_words, 50, input_length=1)) |
|
model.add(LSTM(100)) |
|
model.add(Dense(total_words, activation='softmax')) |
|
|
|
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy']) |
|
model.fit(X, y, epochs=50, verbose=0) |
|
|
|
return model, tokenizer, None |
|
|
|
file_path = 'dialogs.csv' |
|
df = pd.read_csv(file_path) |
|
|
|
|
|
vectorizer = TfidfVectorizer() |
|
|
|
|
|
X = vectorizer.fit_transform(df['Prompt']).toarray() |
|
y = df['Answer'] |
|
|
|
|
|
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) |
|
|
|
|
|
tree_model = DecisionTreeClassifier() |
|
nn_model = MLPClassifier(batch_size=32) |
|
|
|
|
|
voting_clf = VotingClassifier(estimators=[('tree', tree_model), ('nn', nn_model)], voting='hard') |
|
|
|
with open('Voting_model.pkl', 'rb') as file: |
|
voting_model = pickle.load(file) |
|
with open('Creative_model.pkl', 'rb') as file: |
|
voting_model = pickle.load(file) |
|
|
|
def get_combined_response(prompt, voting_model, creative_model, tokenizer, creative_max_sequence_length): |
|
prompt_vector = vectorizer.transform([prompt]).toarray() |
|
response_index = voting_model.predict(prompt_vector)[0] |
|
|
|
|
|
|
|
|
|
|
|
seed_text = df.loc[df['Answer'] == response_index, 'Prompt'].values[0] |
|
creative_response = generate_creative_text(seed_text, CREATIVE_NEXT_WORDS, creative_model, tokenizer, creative_max_sequence_length) |
|
|
|
return "Awnser 1: " + df.loc[df['Answer'] == response_index, 'Answer'].values[0] + " // Awnser 2: " + creative_response |
|
|
|
|
|
def generate_creative_text(seed_text, next_words, model, tokenizer, max_sequence_length): |
|
generated_text = seed_text |
|
for _ in range(next_words): |
|
token_list = tokenizer.texts_to_sequences([seed_text])[0] |
|
if max_sequence_length is not None: |
|
token_list = pad_sequences([token_list], maxlen=max_sequence_length-1, padding='pre') |
|
else: |
|
token_list = [token_list] |
|
|
|
predicted_probabilities = model.predict(token_list, verbose=0) |
|
predicted = np.argmax(predicted_probabilities) |
|
|
|
|
|
output_word = "" |
|
for word, index in tokenizer.word_index.items(): |
|
if index == predicted: |
|
output_word = word |
|
break |
|
|
|
seed_text += " " + output_word |
|
generated_text += " " + output_word |
|
|
|
return generated_text |
|
|
|
|
|
|
|
|
|
|
|
|
|
creative_max_sequence_length = 10 |
|
VOTING_RESPONSE_INDEX = 0 |
|
CREATIVE_NEXT_WORDS = 10 |
|
|
|
def chat_interface(user_input): |
|
response = get_combined_response(user_input, voting_clf, creative_model, creative_tokenizer, creative_max_sequence_length) |
|
|
|
|
|
print(f"Model Response: {response}") |
|
|
|
|
|
score = int(input(f"Puntuaci贸n para la respuesta '{response}': ")) |
|
|
|
|
|
if score <= 2: |
|
|
|
correct_response = input(f"La respuesta actual es '{response}'. 驴Cu谩l es la respuesta correcta?: ") |
|
|
|
|
|
if correct_response.lower() != response.lower(): |
|
new_data = {'Prompt': user_input, 'Answer': correct_response} |
|
df = df.append(new_data, ignore_index=True) |
|
|
|
with open('dialogs.txt', 'a') as dialogs_file: |
|
dialogs_file.write(f"{user_input}\t{correct_response}\n") |
|
|
|
new_X = vectorizer.transform([user_input]).toarray() |
|
new_y = [correct_response] |
|
X = np.concatenate((X, new_X)) |
|
y = np.concatenate((y, new_y)) |
|
|
|
|
|
|
|
|
|
print("隆Gracias por tu correcci贸n! El modelo ha sido actualizado para mejorar. La pr贸xima vez el modelo tendr谩 en cuenta tus respuestas correctas.") |
|
else: |
|
print("Entendido. No se necesita correcci贸n.") |
|
else: |
|
print("隆Gracias por tu retroalimentaci贸n!") |
|
|
|
|
|
df.to_csv('dialogs.csv', index=False) |
|
|
|
|
|
iface = gr.Interface(fn=chat_interface, inputs="text", outputs="text") |
|
iface.launch() |
|
|