raja-7-c's picture
Update app.py
5c4dc00
raw
history blame
15.3 kB
# -*- coding: utf-8 -*-
"""Anxiety_label_training_google.ipynb
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/17f7DEZeKdrpQTPfqFe50SWnC-kIg3G-5
#Prediction of anxiety levels through text analysis
#Transcript loading method
When considering both the interviewer and the participant, the dataset is reduced to the sessions of 186 individuals, as 3 transcripts do not contain the text corresponding to Ellie, the virtual interviewer.
"""
import pandas as pd
import re
import glob
"""#Importing the required libraries"""
import glob
import pandas as pd
import numpy as np
import re
import fnmatch
import os
import keras
from keras.datasets import fashion_mnist
from keras.models import Sequential, Model
from keras.layers import Dense, Dropout, Embedding, LSTM, Input, Activation, GlobalAveragePooling1D, Flatten, Concatenate, Conv1D, MaxPooling1D
from tensorflow.keras.layers import BatchNormalization
from keras.layers import concatenate
from keras.optimizers import SGD, RMSprop, Adagrad, Adam
from keras.preprocessing.text import one_hot, text_to_word_sequence, Tokenizer
from keras_preprocessing.sequence import pad_sequences
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.utils.vis_utils import plot_model
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer
from string import punctuation
from scipy import stats
from keras.utils.vis_utils import plot_model
import matplotlib
import matplotlib.pyplot as plt
import itertools
import gensim
import nltk
from nltk.stem import WordNetLemmatizer
nltk.download('wordnet')
nltk.download('stopwords')
wordnet_lemmatizer = WordNetLemmatizer()
labels=['none','mild','moderate','moderately severe', 'severe']
num_classes = len(labels)
def plot_acc(history, title="Model Accuracy"):
"""Imprime una gráfica mostrando la accuracy por epoch obtenida en un entrenamiento"""
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title(title)
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Val'], loc='upper left')
plt.show()
def plot_loss(history, title="Model Loss"):
"""Imprime una gráfica mostrando la pérdida por epoch obtenida en un entrenamiento"""
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title(title)
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Val'], loc='upper right')
plt.show()
def plot_compare_losses(history1, history2, name1="Red 1",
name2="Red 2", title="Graph title"):
"""Compara losses de dos entrenamientos con nombres name1 y name2"""
plt.plot(history1.history['loss'], color="green")
plt.plot(history1.history['val_loss'], 'r--', color="green")
plt.plot(history2.history['loss'], color="blue")
plt.plot(history2.history['val_loss'], 'r--', color="blue")
plt.title(title)
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train ' + name1, 'Val ' + name1,
'Train ' + name2, 'Val ' + name2],
loc='upper right')
plt.show()
def plot_compare_accs(history1, history2, name1="Red 1",
name2="Red 2", title="Graph title"):
"""Compara accuracies de dos entrenamientos con nombres name1 y name2"""
plt.plot(history1.history['acc'], color="green")
plt.plot(history1.history['val_acc'], 'r--', color="green")
plt.plot(history2.history['acc'], color="blue")
plt.plot(history2.history['val_acc'], 'r--', color="blue")
plt.title(title)
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train ' + name1, 'Val ' + name1,
'Train ' + name2, 'Val ' + name2],
loc='lower right')
plt.show()
def plot_compare_multiple_metrics(history_array, names, colors, title="Graph title", metric='acc'):
legend = []
for i in range(0, len(history_array)):
plt.plot(history_array[i].history[metric], color=colors[i])
plt.plot(history_array[i].history['val_' + metric], 'r--', color=colors[i])
legend.append('Train ' + names[i])
legend.append('Val ' + names[i])
plt.title(title)
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.axis
plt.legend(legend,
loc='lower right')
plt.show()
"""#Loading and preprocessing of transcripts"""
all_participants = pd.read_csv('all.csv', sep=',')
all_participants.columns = ['index','personId', 'question', 'answer']
all_participants = all_participants.astype({"index": float, "personId": float, "question": str, "answer": str })
all_participants.head()
"""#Data analysis"""
ds_len = len(all_participants)
len_answers = [len(v) for v in all_participants['answer']]
ds_max = max(len_answers)
ds_min = min(len_answers)
stats.describe(len_answers)
plt.hist(len_answers)
plt.show()
"""#Auxiliary functions for text processing
Function taken from Kaggle for text cleaning
"""
# The function "text_to_wordlist" is from
# https://www.kaggle.com/currie32/quora-question-pairs/the-importance-of-cleaning-text
def text_to_wordlist(text, remove_stopwords=True, stem_words=False):
# Clean the text, with the option to remove stopwords and to stem words.
# Convert words to lower case and split them
text = text.lower().split()
# Optionally, remove stop words
if remove_stopwords:
stops = set(stopwords.words("english"))
text = [wordnet_lemmatizer.lemmatize(w) for w in text if not w in stops ]
text = [w for w in text if w != "nan" ]
text = " ".join(text)
# Clean the text
text = re.sub(r"[^A-Za-z0-9^,!.\/'+-=]", " ", text)
text = re.sub(r"what's", "what is ", text)
text = re.sub(r"\'s", " ", text)
text = re.sub(r"\'ve", " have ", text)
text = re.sub(r"can't", "cannot ", text)
text = re.sub(r"n't", " not ", text)
text = re.sub(r"i'm", "i am ", text)
text = re.sub(r"\'re", " are ", text)
text = re.sub(r"\'d", " would ", text)
text = re.sub(r"\'ll", " will ", text)
text = re.sub(r",", " ", text)
text = re.sub(r"\.", " ", text)
text = re.sub(r"!", " ! ", text)
text = re.sub(r"\/", " ", text)
text = re.sub(r"\^", " ^ ", text)
text = re.sub(r"\+", " + ", text)
text = re.sub(r"\-", " - ", text)
text = re.sub(r"\=", " = ", text)
text = re.sub(r"\<", " ", text)
text = re.sub(r"\>", " ", text)
text = re.sub(r"'", " ", text)
text = re.sub(r"(\d+)(k)", r"\g<1>000", text)
text = re.sub(r":", " : ", text)
text = re.sub(r" e g ", " eg ", text)
text = re.sub(r" b g ", " bg ", text)
text = re.sub(r" u s ", " american ", text)
text = re.sub(r"\0s", "0", text)
text = re.sub(r" 9 11 ", "911", text)
text = re.sub(r"e - mail", "email", text)
text = re.sub(r"j k", "jk", text)
text = re.sub(r"\s{2,}", " ", text)
# Optionally, shorten words to their stems
if stem_words:
text = text.split()
stemmer = SnowballStemmer('english')
stemmed_words = [stemmer.stem(word) for word in text]
text = " ".join(stemmed_words)
# Return a list of words
return(text)
nltk.download('omw-1.4')
all_participants_mix = all_participants.copy()
all_participants_mix['answer'] = all_participants_mix.apply(lambda row: text_to_wordlist(row.answer).split(), axis=1)
words = [w for w in all_participants_mix['answer'].tolist()]
words = set(itertools.chain(*words))
vocab_size = len(words)
"""Text cleaning
Lemmatization
Separation into vectors
"""
windows_size = 10
tokenizer = Tokenizer(num_words=vocab_size)
tokenizer.fit_on_texts(all_participants_mix['answer'])
tokenizer.fit_on_sequences(all_participants_mix['answer'])
all_participants_mix['t_answer'] = tokenizer.texts_to_sequences(all_participants_mix['answer'])
word_index = tokenizer.word_index
word_size = len(word_index)
all_participants_mix.drop(columns=['question'], inplace=True)
answers = all_participants_mix.groupby('personId').agg(lambda x: x.tolist())
import itertools
# group the remaining columns by 'personId' and convert each group to a list of lists
answers = all_participants_mix.groupby('personId').agg(lambda x: x.tolist())
# flatten the list of lists in the 'answer' column
answers['answer'] = answers['answer'].apply(lambda x: list(itertools.chain.from_iterable(x)))
# flatten the list of lists in the 't_answer' column
answers['t_answer'] = answers['t_answer'].apply(lambda x: list(itertools.chain.from_iterable(x)))
answers
windows_size = 10
cont = 0
phrases_lp = pd.DataFrame(columns=['personId','answer', 't_answer'])
for p in answers.iterrows():
words = p[1]["answer"]
size = len(words)
word_tokens = p[1]["t_answer"]
for i in range(size):
sentence = words[i:min(i+windows_size,size)]
tokens = word_tokens[i:min(i+windows_size,size)]
phrases_lp.loc[cont] = [p[0], sentence, tokens]
cont = cont + 1
def load_avec_dataset_file(path, score_column):
ds = pd.read_csv(path, sep=',')
ds['level'] = pd.cut(ds[score_column], bins=[-1,0,5,10,15,25], labels=[0,1,2,3,4])
ds['PHQ8_Score'] = ds[score_column]
ds['cat_level'] = keras.utils.to_categorical(ds['level'], num_classes).tolist()
ds = ds[['Participant_ID', 'level', 'cat_level', 'PHQ8_Score']]
ds = ds.astype({"Participant_ID": float, "level": int, 'PHQ8_Score': int})
return ds
def split_by_phq_level(ds):
none_ds = ds[ds['level']==0]
mild_ds = ds[ds['level']==1]
moderate_ds = ds[ds['level']==2]
moderate_severe_ds = ds[ds['level']==3]
severe_ds = ds[ds['level']==4]
return (none_ds, mild_ds, moderate_ds, moderate_severe_ds, severe_ds)
def distribute_instances(ds):
ds_shuffled = ds.sample(frac=1)
none_ds, mild_ds, moderate_ds, moderate_severe_ds, severe_ds = split_by_phq_level(ds_shuffled)
split = [70,14,16]
eq_ds = {}
prev_none = prev_mild = prev_moderate = prev_moderate_severe = prev_severe = 0
for p in split:
last_none = min(len(none_ds), prev_none + round(len(none_ds) * p/100))
last_mild = min(len(mild_ds), prev_mild + round(len(mild_ds) * p/100))
last_moderate = min(len(moderate_ds), prev_moderate + round(len(moderate_ds) * p/100))
last_moderate_severe = min(len(moderate_severe_ds), prev_moderate_severe + round(len(moderate_severe_ds) * p/100))
last_severe = min(len(severe_ds), prev_severe + round(len(severe_ds) * p/100))
eq_ds["d"+str(p)] = pd.concat([none_ds[prev_none: last_none], mild_ds[prev_mild: last_mild], moderate_ds[prev_moderate: last_moderate], moderate_severe_ds[prev_moderate_severe: last_moderate_severe], severe_ds[prev_severe: last_severe]])
prev_none = last_none
prev_mild = last_mild
prev_moderate = last_moderate
prev_moderate_severe = last_moderate_severe
prev_severe = last_severe
return (eq_ds["d70"], eq_ds["d14"], eq_ds["d16"])
def test_model(text, model):
print(text)
word_list = text_to_wordlist(text)
sequences = tokenizer.texts_to_sequences([word_list])
sequences_input = list(itertools.chain(*sequences))
sequences_input = pad_sequences([sequences_input], value=0, padding="post", maxlen=windows_size).tolist()
input_a = np.asarray(sequences_input)
pred = model.predict(input_a, batch_size=None, verbose=0, steps=None)
print(pred)
predicted_class = np.argmax(pred)
print(labels[predicted_class])
def confusion_matrix(model, x, y):
prediction = model.predict(x, batch_size=None, verbose=0, steps=None)
labels=['none','mild','moderate','moderately severe', 'severe']
max_prediction = np.argmax(prediction, axis=1)
max_actual = np.argmax(y, axis=1)
y_pred = pd.Categorical.from_codes(max_prediction, labels)
y_actu = pd.Categorical.from_codes(max_actual, labels)
return pd.crosstab(y_actu, y_pred)
import pickle
import pickle
windows_size = 10
# Load the trained model
with open('model_google.pkl', 'rb') as f:
Mode = pickle.load(f)
#def Test_model(text, Model):
# word_list = text_to_wordlist(text)
# sequences = tokenizer.texts_to_sequences([word_list])
# sequences_input = list(itertools.chain(*sequences))
# sequences_input = pad_sequences([sequences_input], value=0, padding="post", maxlen=windows_size).tolist()
# input_a = np.asarray(sequences_input)
# pred = Model.predict(input_a, batch_size=None, verbose=0, steps=None)
#print(pred)
#predicted_class = np.argmax(pred)
#print(labels[predicted_class])
def Test_model(text, Model):
#print(text)
windows_size = 10
word_list = text_to_wordlist(text)
#print(word_list)
sequences = tokenizer.texts_to_sequences([word_list])
sequences_input = list(itertools.chain(*sequences))
if len(sequences_input) <= windows_size_size:
sequences_input = pad_sequences([sequences_input], value=0, padding="post", maxlen=windows_size).tolist()
#print(sequences_input)
input_a = np.asarray(sequences_input)
pred = Modell.predict(input_a, batch_size=None, verbose=0, steps=None)
#print(pred)
predicted_class = np.argmax(pred)
#print(labels[predicted_class])
else:
predictions = []
for i in range(len(sequences_input) - windows_size + 1):
window_input = sequences_input[i : i + windows_size]
#print(window_input)
input_a = np.asarray([window_input])
pred = Modell.predict(input_a, batch_size=None, verbose=0, steps=None)
#print(pred)
predictions.append(pred)
accumulated_pred = np.sum(predictions, axis=0)
predicted_class = np.argmax(np.sum(accumulated_pred, axis=0))
#print(labels[predicted_class])
import gradio as gr
import pickle
# Load the trained model
with open('model_google.pkl', 'rb') as f:
Modell = pickle.load(f)
def predict(text):
windows_size = 10
word_list = text_to_wordlist(text)
sequences = tokenizer.texts_to_sequences([word_list])
sequences_input = list(itertools.chain(*sequences))
if len(sequences_input) <= windows_size:
sequences_input = pad_sequences([sequences_input], value=0, padding="post", maxlen=windows_size).tolist()
#print(sequences_input)
input_a = np.asarray(sequences_input)
pred = Modell.predict(input_a, batch_size=None, verbose=0, steps=None)
#print(pred)
predicted_class = np.argmax(pred)
#print(labels[predicted_class])
else:
predictions = []
for i in range(len(sequences_input) - windows_size + 1):
window_input = sequences_input[i : i + windows_size]
#print(window_input)
input_a = np.asarray([window_input])
pred = Modell.predict(input_a, batch_size=None, verbose=0, steps=None)
#print(pred)
predicted_class = np.argmax(pred)
predictions.append(predicted_class)
#predicted_class = Counter(predictions).most_common(1)[0][0]
return labels[predicted_class]
input_text = gr.inputs.Textbox(label="Enter a sentence")
output_text = gr.outputs.Textbox(label="Predicted label")
iface = gr.Interface(fn=predict, inputs=input_text, outputs=output_text, title="Depression Severity Analysis",
description="Enter texts to classify its depression severity.")
iface.launch()