Model_SA_Space / FunctionsModelSA_V1.py
cd14's picture
Updated model load_state_dict() in FunctionsModelSA_V1.py
1f98dd6
import s3fs
import pandas as pd
import numpy as np
from numpy import arange
from colour import Color
import plotly.graph_objects as go
from nltk import tokenize
from IPython.display import Markdown
from PIL import ImageColor
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import nltk
nltk.download('punkt')
import email
import codecs
import pickle
import string
from scipy import spatial
import re
import pytorch_lightning as pl
from bs4 import BeautifulSoup
import ipywidgets as widgets
from ipywidgets import FileUpload
from urlextract import URLExtract
from transformers import BertTokenizerFast as BertTokenizer, BertModel, BertConfig
import torch.nn as nn
import torch
from ipywidgets import interact, Dropdown
import boto3
from sagemaker import get_execution_role
from scipy import spatial
from ipyfilechooser import FileChooser
import random
PARAMS={
'BATCH_SIZE': 8,
'MAX_TOKEN_COUNT':100,
'BERT_MODEL_NAME':'google/bert_uncased_L-2_H-128_A-2' ,
'N_EPOCHS': 10,
'n_classes':8,
'LABEL_COLUMNS': ['label_analytical', 'label_casual', 'label_confident', 'label_friendly',
'label_joyful', 'label_optimistic', 'label_respectful',
'label_urgent'],
'TEXTCOL': 'text',
'rf_labels':['label_analytical', 'label_casual', 'label_confident',
'label_friendly', 'label_joyful', 'label_optimistic',
'label_respectful', 'label_urgent',
'industry_Academic and Education', 'industry_Energy',
'industry_Entertainment', 'industry_Finance and Banking',
'industry_Healthcare', 'industry_Hospitality', 'industry_Real Estate',
'industry_Retail', 'industry_Software and Technology',
'campaign_type_Abandoned_Cart', 'campaign_type_Engagement',
'campaign_type_Newsletter', 'campaign_type_Product_Announcement',
'campaign_type_Promotional', 'campaign_type_Review_Request',
'campaign_type_Survey', 'campaign_type_Transactional',
'campaign_type_Usage_and_Consumption', 'campaign_type_Webinar']
}
CI_rates=pd.read_csv('CI_RATES.csv')
### create file uploading widget
def email_upload():
print("Please upload your email (In EML Format)")
upload = FileUpload(accept='.eml', multiple=True)
display(upload)
return upload
def parse_email(uploaded_file):
check=[]
filename = list(uploaded_file.value.keys())[0]
email_body_str = codecs.decode(uploaded_file.value[filename]['content'], encoding="utf-8")
b=email.message_from_string(email_body_str)
for part in b.walk():
if part.get_content_type():
body = str(part.get_payload())
soup = BeautifulSoup(body)
paragraphs = soup.find_all('body')
for paragraph in paragraphs:
check.append(paragraph.text)
file="".join(check)
return file
def text_clean(x,punct=True):
### Light
x = x.lower() # lowercase everything
x = x.encode('ascii', 'ignore').decode() # remove unicode characters
x = re.sub(r'https*\S+', ' ', x) # remove links
x = re.sub(r'http*\S+', ' ', x)
# cleaning up text
x = re.sub(r'\'\w+', ' ', x)
x = re.sub(r'\w*\d+\w*', ' ', x)
x = re.sub(r'\s{2,}', ' ', x)
x = re.sub(r'\s[^\w\s]\s', ' ', x)
### Heavy
x = re.sub(r'@\S', ' ', x)
x = re.sub(r'#\S+', ' ', x)
x=x.replace('=',' ')
if(punct==True):
x = re.sub('[%s]' % re.escape(string.punctuation), ' ', x)
# remove single letters and numbers surrounded by space
x = re.sub(r'\s[a-z]\s|\s[0-9]\s', ' ', x)
clean=[' Â\x8a','\t','\n','Ã\x83','Â\x92','Â\x93','Â\x8a','Â\x95']
for y in clean:
x=x.replace(y,'')
return x
####BERT MODEL LOAD REQUIRMENTS#########
class ToneTagger(pl.LightningModule):
def __init__(self, n_classes: int, n_training_steps=None, n_warmup_steps=None):
super().__init__()
self.bert = BertModel.from_pretrained(PARAMS['BERT_MODEL_NAME'], return_dict=True)
self.classifier = nn.Linear(self.bert.config.hidden_size, n_classes)
self.n_training_steps = n_training_steps
self.n_warmup_steps = n_warmup_steps
self.criterion = nn.BCELoss()
def forward(self, input_ids, attention_mask):
output = self.bert(input_ids,attention_mask)
output = self.classifier(output.pooler_output)
output = torch.sigmoid(output)
return output
# LOAD IN PRE TRAINED MODEL WITH WEIGHTS
model=ToneTagger(8) # load up the model archetecture with 8 different tones
model.load_state_dict(torch.load("models/SAMODEL"), strict=False) # populate the weights of the model
model.eval()
def bert_tones(text_sentences,model):
""" This function takes in setences and the model cleaned them then predicts the bert tones"""
predictions=[]
text=[]
tokenizer = BertTokenizer.from_pretrained('google/bert_uncased_L-2_H-128_A-2')
for sent in text_sentences:
text.append(text_clean(sent,False))
cleaned_text=text_clean(sent)
encoding = tokenizer.encode_plus(
cleaned_text,
add_special_tokens=True,
max_length=100,
return_token_type_ids=False,
padding="max_length",
truncation=True,
return_attention_mask=True,
return_tensors='pt',
)
with torch.no_grad():
inputs=encoding['input_ids']
attention=encoding['attention_mask']
pred=model(inputs,attention)
pred=pred.cpu().numpy()
predictions.append(np.array(pred[0]))
return text,predictions
def convert_text_to_tone(text,model=model,params=PARAMS):
""" This Function will convert the text to tone, it takes in the text with punctuations seperates it into senteces"""
data=[]
# Find the sentiment from vader sentiment analyzer (Not currently in use)
sid_obj = SentimentIntensityAnalyzer()
total_cleaned=text_clean(text)
sentiment_dict = sid_obj.polarity_scores(total_cleaned)# Find the sentiment from
text_sentences=tokenize.sent_tokenize(text) #Find all the different sentences through the NLTK library
plain_text,predictions=bert_tones(text_sentences,model)
data.append([plain_text,sentiment_dict,predictions])
final=pd.DataFrame(data,columns=['text','sentiment','sentencetone'])
# print(final)
agg_tones=final['sentencetone'].apply(np.mean,axis=0)
tones=pd.DataFrame(agg_tones.tolist(),columns=params['LABEL_COLUMNS'])
return final,tones
### This will be abstracted away to a more dynamic model
brf='Rate_Models/bounce_rate_model.sav'
BRM = pickle.load(open(brf, 'rb'))
orf='Rate_Models/open_rate_model.sav'
ORM = pickle.load(open(orf, 'rb'))
urf='Rate_Models/unsubscribe_rate_model.sav'
URM = pickle.load(open(urf, 'rb'))
crf='Rate_Models/click_trough_rate_model.sav'
CRM = pickle.load(open(crf, 'rb'))
CV='Rate_Models/Conversion_rate.sav'
ConM = pickle.load(open(CV, 'rb'))
CTOR='Rate_Models/Click-To-Open_Rates.sav'
CTORM = pickle.load(open(CTOR, 'rb'))
RV='Rate_Models/Revenue_per_email.sav'
RVM = pickle.load(open(RV, 'rb'))
model_dict={'Open_Rate':ORM,
'Click_Through_Rate': CRM,
'Unsubscribe_Rate': URM,
'Bounce_Rate':BRM,
'Click_To_Open_Rate': CTORM,
'Conversion_Rate': ConM,
'Revenue_Per_Email':RVM}
## Plot confidence interval
def plot_CI(pred,lower,upper,scale_factor=0.5):
"""This function plots the confidence intervals of your prediction
pred- The prediction varaible given from the Random Forest for the target variable
lower- The lower half of the prediction confidence interval
upper- The upper half of the confidence interval
scale_factor- This will modify the size of the graph """
title=f'The Predicted Value is {pred}'
fig = go.Figure()
fig.update_xaxes(showgrid=False)
fig.update_yaxes(showgrid=False,
zeroline=True, zerolinecolor='black', zerolinewidth=3,
showticklabels=False)
fig.update_layout(height=200, plot_bgcolor='white')
fig.add_trace(go.Scatter(
x=[pred], y=[0,0], mode='markers', marker_size=10,line=dict(color="red")
))
fig.update_layout(xaxis_range=[0,upper+upper*scale_factor])
fig.update_layout(showlegend=False)
fig.add_vline(x=lower,annotation_text=f"{lower}",annotation_position="top")
fig.add_vline(x=upper,annotation_text=f"{upper}",annotation_position="top")
fig.add_vrect(lower,upper,fillcolor='red',opacity=0.25,annotation_text='95% CI',annotation_position="outside top")
fig.update_layout(title_text=title, title_x=0.5)
fig.show()
def find_max_cat(df,target,industry,campaign):
#### Select entries with the matching industry and campaign (1 == True)
d=df[(df[campaign]==1) & (df[industry]==1)]
if(len(d)>0):
rec=df.loc[d[target].idxmax()][3:11] ## Select the tone values for the best target values
return round(d[target].min(),3),round(d[target].max(),3),rec ## select the top target variable value and return with the tones
else:
return 0,0,0
def scale_values(val, tn): ## val = slider value, tn = current tone value
val = tn*100
return val
tone_labels = ['Analytical', 'Casual', 'Confident', 'Friendly', 'Joyful', 'Optimistic', 'Respectful', 'Urgent']
# ## Plot recommendations - ORIGINAL FROM V1.0
# def recommend(tones,recommend_changes,change,target):
# ''' This function creates the recomended changes plots it takes it the tones, the changes and '''
# fig = go.Figure()
# fig.add_trace(go.Bar(
# y=tones.columns,
# x=tones.values[0],
# name='Current Tones',
# orientation='h',
# # text=np.round(tones.values[0],3),
# width=.9,
# marker=dict(
# color='#00e6b1',
# line=dict(color='rgba(58, 71, 80, 1.0)', width=3)
# )
# ))
# fig.add_trace(go.Bar(
# y=tones.columns,
# x=recommend_changes,
# name='Recommend changes',
# orientation='h',
# text=np.round(recommend_changes,3),
# width=.5,
# marker=dict(
# color='#e60f00',
# line=dict(color='rgba(58, 71, 80, 1.0)', width=3)
# )
# ))
# fig.update_traces(textfont_size=18, textposition="outside", cliponaxis=False)
# fig.update_layout(height=1000, plot_bgcolor='white')
# fig.update_layout(barmode='stack', yaxis={'categoryorder':'array','categoryarray': recommend_changes.sort_values(key=abs,ascending=True).index})
# fig.update_layout(title_text=f'The following Changes will yield a {round(change,3)} increase in {target}')
# fig.show()
## Plot recommendations - MODIFIED
def recommend(tones,recommend_changes,change,target):
''' This function creates the recomended changes plots it takes it the tones, the changes and '''
fig = go.Figure()
fig.add_trace(go.Bar(
# y=tones.columns,
y=tone_labels,
x=recommend_changes,
name='Recommend changes',
orientation='h',
text=np.round(recommend_changes,3),
width=.5,
marker=dict(
color='#e60f00',
line=dict(color='rgba(58, 71, 80, 1.0)', width=1)
)
))
fig.update_traces(textfont_size=18, textposition="outside", cliponaxis=False)
# fig.update_layout(height=1000, plot_bgcolor='white')
# fig.update_layout(barmode='stack', yaxis={'categoryorder':'array','categoryarray': recommend_changes.sort_values(key=abs,ascending=True).index})
# fig.update_layout(title_text=f'The following Changes will yield a {round(change,3)} increase in {target}')
if target == 'Revenue_Per_Email':
out = f"${round(change,2)}"
else:
out = f"{round(change,2)*100}%"
fig.update_layout(title_text=f'The following Changes will yield a {out} increase in {target}')
fig.show()
def prediction(tones,campaign_val,industry_val,target):
model_val=pd.DataFrame(tones,columns=PARAMS['rf_labels']).fillna(0)
model_val.loc[0,campaign_val]=1
model_val.loc[0,industry_val]=1
model=model_dict[target]
pred=model.predict(model_val)[0]
# y_pred = regr.predict(X_test)
# r2_test = r2_score(y_test, y_pred)
CI=CI_rates[CI_rates['model']==target]
lower=pred+CI['2_5'].values[0]
higher=pred+CI['97_5'].values[0]
return pred,round(lower,3),round(higher,3),model
## Plot recommendations for intensity changes
def intensity_changes(tones,recommend_changes,change,target):
''' This function creates a plot to show the change made to intensities and shows the resulting change in target rate '''
fig = go.Figure()
fig.add_trace(go.Bar(
# y=tones.columns,
y=tone_labels,
x=recommend_changes,
name='Recommend changes',
orientation='h',
text=np.round(recommend_changes,3),
width=.5,
marker=dict(
color='#00e6b1',
line=dict(color='rgba(58, 71, 80, 1.0)', width=1)
)
))
fig.update_traces(textfont_size=18, textposition="outside", cliponaxis=False)
if change < 0:
if target == 'Revenue_Per_Email':
out = f"${round(change*(-1),2)}"
else:
out = f"{round(change*(-1),2)}%"
fig.update_layout(title_text=f'The following Changes will decrease the {target} by {out}')
elif change >= 0:
if target == 'Revenue_Per_Email':
out = f"${round(change,2)}"
else:
out = f"{round(change,2)*100}%"
fig.update_layout(title_text=f'The following Changes will increase the {target} by {out}')
# fig.update_layout(title_text=f'The changes made to the tone intensities')
fig.show()
def load_data():
data_location='Tone_and_target.csv'
df=pd.read_csv(data_location)
df_unique = df.drop_duplicates()
df_unique = pd.get_dummies(df_unique, columns=['industry','campaign_type'])
df_data=df_unique.drop(columns=['Unnamed: 0','body'])
df_data=df_data.rename(columns={'Click-To-Open Rates':'Click_To_Open_Rate','Conversion Rate':'Conversion_Rate','Revenue Per email':'Revenue_Per_Email'})
return df_data
def plot_table(sorted_setence_tuple):
""" Plots the bottom most table, takes in a list of tuples where the tuple is the sentence the sentiment distance
from the best values """
sentences=list(zip(*sorted_setence_tuple))[0]
scores= list(zip(*sorted_setence_tuple))[1]
colors= list(zip(*sorted_setence_tuple))[2]
rbg_list=[]
for i in colors:
rbg_list.append('rgb'+str(i))
fig = go.Figure(data=[go.Table(
header=dict(values=['<b>Sentences</b>', '<b>Difference from Recommended Tone</b>'],
line_color = 'darkslategray',
fill_color = '#010405',
align = 'center',
font=dict(family="Metropolis",color='white', size=16)),
cells=dict(values=[sentences, # 1st column
scores] , # 2nd column
line_color='darkslategray',
fill_color=[rbg_list],
align=['left','center'],
font=dict(family="Arial",size=12)))
])
#fig.show()
def corrections(best,df):
"""This function finds the the difference between the tone of each sentence and the best tone for the desired metric
best- tone values of the best email for the current categories
df- dataframe of the sentences of the uploaded email and the """
sentence_order=[]
colors=['#48f0c9','#6ef5d6','#94f7e1','#bbfaec','#e6fff9','#ffe7e6','#ffc3bf','#ffa099','#ff7c73','#ff584d'] #loxz green primary to Loxz light red
for i in range(len(df['sentencetone'][0])):
text=df['text'][0][i]
cur=df['sentencetone'][0][i]
cosine_distance= spatial.distance.cosine(best,cur)
distance=cosine_distance # Cosine distance
new_value = round(( (distance - 0) / (1 - 0) ) * (100 - 0) + 0) # for distance metric this is just normalizing the varaible
color_value=round(( (distance - 0) / (1 - 0) ) * (10 - 0) + 0) # Color whell value
col=colors[(color_value)]
rbg=ImageColor.getcolor(f'{col}', "RGB")
sentence_order.append((text,new_value,rbg))
sorted_sentences=sorted(sentence_order,key=lambda x: x[1],reverse=True)
plot_table(sorted_sentences)
def read_file(fc):
with open(fc.selected) as file: # Use file to refer to the file object
data = file.read()
check=[]
b=email.message_from_string(data)
for part in b.walk():
if part.get_content_type():
body = str(part.get_payload())
soup = BeautifulSoup(body)
paragraphs = soup.find_all('body')
for paragraph in paragraphs:
check.append(paragraph.text)
file="".join(check)
return file