# -*- coding: utf-8 -*- """Anxiety_label_training_google.ipynb Automatically generated by Colaboratory. Original file is located at https://colab.research.google.com/drive/17f7DEZeKdrpQTPfqFe50SWnC-kIg3G-5 #Prediction of anxiety levels through text analysis #Transcript loading method When considering both the interviewer and the participant, the dataset is reduced to the sessions of 186 individuals, as 3 transcripts do not contain the text corresponding to Ellie, the virtual interviewer. """ import pandas as pd import re import glob """#Importing the required libraries""" import glob import pandas as pd import numpy as np import re import fnmatch import os import keras from keras.datasets import fashion_mnist from keras.models import Sequential, Model from keras.layers import Dense, Dropout, Embedding, LSTM, Input, Activation, GlobalAveragePooling1D, Flatten, Concatenate, Conv1D, MaxPooling1D from tensorflow.keras.layers import BatchNormalization from keras.layers import concatenate from keras.optimizers import SGD, RMSprop, Adagrad, Adam from keras.preprocessing.text import one_hot, text_to_word_sequence, Tokenizer from keras_preprocessing.sequence import pad_sequences from keras.callbacks import EarlyStopping, ModelCheckpoint from keras.utils.vis_utils import plot_model from nltk.corpus import stopwords from nltk.stem import SnowballStemmer from string import punctuation from scipy import stats from keras.utils.vis_utils import plot_model import matplotlib import matplotlib.pyplot as plt import itertools import gensim import nltk from nltk.stem import WordNetLemmatizer nltk.download('wordnet') nltk.download('stopwords') wordnet_lemmatizer = WordNetLemmatizer() labels=['none','mild','moderate','moderately severe', 'severe'] num_classes = len(labels) def plot_acc(history, title="Model Accuracy"): """Imprime una gráfica mostrando la accuracy por epoch obtenida en un entrenamiento""" plt.plot(history.history['accuracy']) plt.plot(history.history['val_accuracy']) plt.title(title) plt.ylabel('Accuracy') plt.xlabel('Epoch') plt.legend(['Train', 'Val'], loc='upper left') plt.show() def plot_loss(history, title="Model Loss"): """Imprime una gráfica mostrando la pérdida por epoch obtenida en un entrenamiento""" plt.plot(history.history['loss']) plt.plot(history.history['val_loss']) plt.title(title) plt.ylabel('Loss') plt.xlabel('Epoch') plt.legend(['Train', 'Val'], loc='upper right') plt.show() def plot_compare_losses(history1, history2, name1="Red 1", name2="Red 2", title="Graph title"): """Compara losses de dos entrenamientos con nombres name1 y name2""" plt.plot(history1.history['loss'], color="green") plt.plot(history1.history['val_loss'], 'r--', color="green") plt.plot(history2.history['loss'], color="blue") plt.plot(history2.history['val_loss'], 'r--', color="blue") plt.title(title) plt.ylabel('Loss') plt.xlabel('Epoch') plt.legend(['Train ' + name1, 'Val ' + name1, 'Train ' + name2, 'Val ' + name2], loc='upper right') plt.show() def plot_compare_accs(history1, history2, name1="Red 1", name2="Red 2", title="Graph title"): """Compara accuracies de dos entrenamientos con nombres name1 y name2""" plt.plot(history1.history['acc'], color="green") plt.plot(history1.history['val_acc'], 'r--', color="green") plt.plot(history2.history['acc'], color="blue") plt.plot(history2.history['val_acc'], 'r--', color="blue") plt.title(title) plt.ylabel('Accuracy') plt.xlabel('Epoch') plt.legend(['Train ' + name1, 'Val ' + name1, 'Train ' + name2, 'Val ' + name2], loc='lower right') plt.show() def plot_compare_multiple_metrics(history_array, names, colors, title="Graph title", metric='acc'): legend = [] for i in range(0, len(history_array)): plt.plot(history_array[i].history[metric], color=colors[i]) plt.plot(history_array[i].history['val_' + metric], 'r--', color=colors[i]) legend.append('Train ' + names[i]) legend.append('Val ' + names[i]) plt.title(title) plt.ylabel('Accuracy') plt.xlabel('Epoch') plt.axis plt.legend(legend, loc='lower right') plt.show() """#Loading and preprocessing of transcripts""" all_participants = pd.read_csv('all.csv', sep=',') all_participants.columns = ['index','personId', 'question', 'answer'] all_participants = all_participants.astype({"index": float, "personId": float, "question": str, "answer": str }) all_participants.head() """#Data analysis""" ds_len = len(all_participants) len_answers = [len(v) for v in all_participants['answer']] ds_max = max(len_answers) ds_min = min(len_answers) stats.describe(len_answers) plt.hist(len_answers) plt.show() """#Auxiliary functions for text processing Function taken from Kaggle for text cleaning """ # The function "text_to_wordlist" is from # https://www.kaggle.com/currie32/quora-question-pairs/the-importance-of-cleaning-text def text_to_wordlist(text, remove_stopwords=True, stem_words=False): # Clean the text, with the option to remove stopwords and to stem words. # Convert words to lower case and split them text = text.lower().split() # Optionally, remove stop words if remove_stopwords: stops = set(stopwords.words("english")) text = [wordnet_lemmatizer.lemmatize(w) for w in text if not w in stops ] text = [w for w in text if w != "nan" ] text = " ".join(text) # Clean the text text = re.sub(r"[^A-Za-z0-9^,!.\/'+-=]", " ", text) text = re.sub(r"what's", "what is ", text) text = re.sub(r"\'s", " ", text) text = re.sub(r"\'ve", " have ", text) text = re.sub(r"can't", "cannot ", text) text = re.sub(r"n't", " not ", text) text = re.sub(r"i'm", "i am ", text) text = re.sub(r"\'re", " are ", text) text = re.sub(r"\'d", " would ", text) text = re.sub(r"\'ll", " will ", text) text = re.sub(r",", " ", text) text = re.sub(r"\.", " ", text) text = re.sub(r"!", " ! ", text) text = re.sub(r"\/", " ", text) text = re.sub(r"\^", " ^ ", text) text = re.sub(r"\+", " + ", text) text = re.sub(r"\-", " - ", text) text = re.sub(r"\=", " = ", text) text = re.sub(r"\<", " ", text) text = re.sub(r"\>", " ", text) text = re.sub(r"'", " ", text) text = re.sub(r"(\d+)(k)", r"\g<1>000", text) text = re.sub(r":", " : ", text) text = re.sub(r" e g ", " eg ", text) text = re.sub(r" b g ", " bg ", text) text = re.sub(r" u s ", " american ", text) text = re.sub(r"\0s", "0", text) text = re.sub(r" 9 11 ", "911", text) text = re.sub(r"e - mail", "email", text) text = re.sub(r"j k", "jk", text) text = re.sub(r"\s{2,}", " ", text) # Optionally, shorten words to their stems if stem_words: text = text.split() stemmer = SnowballStemmer('english') stemmed_words = [stemmer.stem(word) for word in text] text = " ".join(stemmed_words) # Return a list of words return(text) nltk.download('omw-1.4') all_participants_mix = all_participants.copy() all_participants_mix['answer'] = all_participants_mix.apply(lambda row: text_to_wordlist(row.answer).split(), axis=1) words = [w for w in all_participants_mix['answer'].tolist()] words = set(itertools.chain(*words)) vocab_size = len(words) """Text cleaning Lemmatization Separation into vectors """ windows_size = 10 tokenizer = Tokenizer(num_words=vocab_size) tokenizer.fit_on_texts(all_participants_mix['answer']) tokenizer.fit_on_sequences(all_participants_mix['answer']) all_participants_mix['t_answer'] = tokenizer.texts_to_sequences(all_participants_mix['answer']) word_index = tokenizer.word_index word_size = len(word_index) all_participants_mix.drop(columns=['question'], inplace=True) answers = all_participants_mix.groupby('personId').agg(lambda x: x.tolist()) import itertools # group the remaining columns by 'personId' and convert each group to a list of lists answers = all_participants_mix.groupby('personId').agg(lambda x: x.tolist()) # flatten the list of lists in the 'answer' column answers['answer'] = answers['answer'].apply(lambda x: list(itertools.chain.from_iterable(x))) # flatten the list of lists in the 't_answer' column answers['t_answer'] = answers['t_answer'].apply(lambda x: list(itertools.chain.from_iterable(x))) answers windows_size = 10 cont = 0 phrases_lp = pd.DataFrame(columns=['personId','answer', 't_answer']) for p in answers.iterrows(): words = p[1]["answer"] size = len(words) word_tokens = p[1]["t_answer"] for i in range(size): sentence = words[i:min(i+windows_size,size)] tokens = word_tokens[i:min(i+windows_size,size)] phrases_lp.loc[cont] = [p[0], sentence, tokens] cont = cont + 1 def load_avec_dataset_file(path, score_column): ds = pd.read_csv(path, sep=',') ds['level'] = pd.cut(ds[score_column], bins=[-1,0,5,10,15,25], labels=[0,1,2,3,4]) ds['PHQ8_Score'] = ds[score_column] ds['cat_level'] = keras.utils.to_categorical(ds['level'], num_classes).tolist() ds = ds[['Participant_ID', 'level', 'cat_level', 'PHQ8_Score']] ds = ds.astype({"Participant_ID": float, "level": int, 'PHQ8_Score': int}) return ds def split_by_phq_level(ds): none_ds = ds[ds['level']==0] mild_ds = ds[ds['level']==1] moderate_ds = ds[ds['level']==2] moderate_severe_ds = ds[ds['level']==3] severe_ds = ds[ds['level']==4] return (none_ds, mild_ds, moderate_ds, moderate_severe_ds, severe_ds) def distribute_instances(ds): ds_shuffled = ds.sample(frac=1) none_ds, mild_ds, moderate_ds, moderate_severe_ds, severe_ds = split_by_phq_level(ds_shuffled) split = [70,14,16] eq_ds = {} prev_none = prev_mild = prev_moderate = prev_moderate_severe = prev_severe = 0 for p in split: last_none = min(len(none_ds), prev_none + round(len(none_ds) * p/100)) last_mild = min(len(mild_ds), prev_mild + round(len(mild_ds) * p/100)) last_moderate = min(len(moderate_ds), prev_moderate + round(len(moderate_ds) * p/100)) last_moderate_severe = min(len(moderate_severe_ds), prev_moderate_severe + round(len(moderate_severe_ds) * p/100)) last_severe = min(len(severe_ds), prev_severe + round(len(severe_ds) * p/100)) eq_ds["d"+str(p)] = pd.concat([none_ds[prev_none: last_none], mild_ds[prev_mild: last_mild], moderate_ds[prev_moderate: last_moderate], moderate_severe_ds[prev_moderate_severe: last_moderate_severe], severe_ds[prev_severe: last_severe]]) prev_none = last_none prev_mild = last_mild prev_moderate = last_moderate prev_moderate_severe = last_moderate_severe prev_severe = last_severe return (eq_ds["d70"], eq_ds["d14"], eq_ds["d16"]) def test_model(text, model): print(text) word_list = text_to_wordlist(text) sequences = tokenizer.texts_to_sequences([word_list]) sequences_input = list(itertools.chain(*sequences)) sequences_input = pad_sequences([sequences_input], value=0, padding="post", maxlen=windows_size).tolist() input_a = np.asarray(sequences_input) pred = model.predict(input_a, batch_size=None, verbose=0, steps=None) print(pred) predicted_class = np.argmax(pred) print(labels[predicted_class]) def confusion_matrix(model, x, y): prediction = model.predict(x, batch_size=None, verbose=0, steps=None) labels=['none','mild','moderate','moderately severe', 'severe'] max_prediction = np.argmax(prediction, axis=1) max_actual = np.argmax(y, axis=1) y_pred = pd.Categorical.from_codes(max_prediction, labels) y_actu = pd.Categorical.from_codes(max_actual, labels) return pd.crosstab(y_actu, y_pred) import pickle import pickle windows_size = 10 # Load the trained model with open('model_google.pkl', 'rb') as f: Mode = pickle.load(f) #def Test_model(text, Model): # word_list = text_to_wordlist(text) # sequences = tokenizer.texts_to_sequences([word_list]) # sequences_input = list(itertools.chain(*sequences)) # sequences_input = pad_sequences([sequences_input], value=0, padding="post", maxlen=windows_size).tolist() # input_a = np.asarray(sequences_input) # pred = Model.predict(input_a, batch_size=None, verbose=0, steps=None) #print(pred) #predicted_class = np.argmax(pred) #print(labels[predicted_class]) def Test_model(text, Model): #print(text) window_size = 10 word_list = text_to_wordlist(text) #print(word_list) sequences = tokenizer.texts_to_sequences([word_list]) sequences_input = list(itertools.chain(*sequences)) if len(sequences_input) <= window_size: sequences_input = pad_sequences([sequences_input], value=0, padding="post", maxlen=window_size).tolist() #print(sequences_input) input_a = np.asarray(sequences_input) pred = Modell.predict(input_a, batch_size=None, verbose=0, steps=None) #print(pred) predicted_class = np.argmax(pred) #print(labels[predicted_class]) else: predictions = [] for i in range(len(sequences_input) - window_size + 1): window_input = sequences_input[i : i + window_size] #print(window_input) input_a = np.asarray([window_input]) pred = Modell.predict(input_a, batch_size=None, verbose=0, steps=None) #print(pred) predictions.append(pred) accumulated_pred = np.sum(predictions, axis=0) predicted_class = np.argmax(np.sum(accumulated_pred, axis=0)) #print(labels[predicted_class]) import gradio as gr import pickle # Load the trained model with open('model_google.pkl', 'rb') as f: Modell = pickle.load(f) def predict(text): word_list = text_to_wordlist(text) sequences = tokenizer.texts_to_sequences([word_list]) sequences_input = list(itertools.chain(*sequences)) if len(sequences_input) <= window_size: sequences_input = pad_sequences([sequences_input], value=0, padding="post", maxlen=window_size).tolist() #print(sequences_input) input_a = np.asarray(sequences_input) pred = Modell.predict(input_a, batch_size=None, verbose=0, steps=None) #print(pred) predicted_class = np.argmax(pred) #print(labels[predicted_class]) else: predictions = [] for i in range(len(sequences_input) - window_size + 1): window_input = sequences_input[i : i + window_size] #print(window_input) input_a = np.asarray([window_input]) pred = Modell.predict(input_a, batch_size=None, verbose=0, steps=None) #print(pred) predictions.append(pred) accumulated_pred = np.sum(predictions, axis=0) predicted_class = np.argmax(np.sum(accumulated_pred, axis=0)) #print(labels[predicted_class]) return labels[predicted_class] input_text = gr.inputs.Textbox(label="Enter a sentence") output_text = gr.outputs.Textbox(label="Predicted label") iface = gr.Interface(fn=predict, inputs=input_text, outputs=output_text, title="Depression Severity Analysis", description="Enter texts to classify its depression severity.") iface.launch()