import s3fs import pandas as pd import numpy as np from numpy import arange from colour import Color import plotly.graph_objects as go from nltk import tokenize from IPython.display import Markdown from PIL import ImageColor from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer import nltk nltk.download('punkt') import email import codecs import pickle import string from scipy import spatial import re import pytorch_lightning as pl from bs4 import BeautifulSoup import ipywidgets as widgets from ipywidgets import FileUpload from urlextract import URLExtract from transformers import BertTokenizerFast as BertTokenizer, BertModel, BertConfig import torch.nn as nn import torch from ipywidgets import interact, Dropdown import boto3 from sagemaker import get_execution_role from scipy import spatial from ipyfilechooser import FileChooser import random PARAMS={ 'BATCH_SIZE': 8, 'MAX_TOKEN_COUNT':100, 'BERT_MODEL_NAME':'google/bert_uncased_L-2_H-128_A-2' , 'N_EPOCHS': 10, 'n_classes':8, 'LABEL_COLUMNS': ['label_analytical', 'label_casual', 'label_confident', 'label_friendly', 'label_joyful', 'label_optimistic', 'label_respectful', 'label_urgent'], 'TEXTCOL': 'text', 'rf_labels':['label_analytical', 'label_casual', 'label_confident', 'label_friendly', 'label_joyful', 'label_optimistic', 'label_respectful', 'label_urgent', 'industry_Academic and Education', 'industry_Energy', 'industry_Entertainment', 'industry_Finance and Banking', 'industry_Healthcare', 'industry_Hospitality', 'industry_Real Estate', 'industry_Retail', 'industry_Software and Technology', 'campaign_type_Abandoned_Cart', 'campaign_type_Engagement', 'campaign_type_Newsletter', 'campaign_type_Product_Announcement', 'campaign_type_Promotional', 'campaign_type_Review_Request', 'campaign_type_Survey', 'campaign_type_Transactional', 'campaign_type_Usage_and_Consumption', 'campaign_type_Webinar'] } CI_rates=pd.read_csv('CI_RATES.csv') ### create file uploading widget def email_upload(): print("Please upload your email (In EML Format)") upload = FileUpload(accept='.eml', multiple=True) display(upload) return upload def parse_email(uploaded_file): check=[] filename = list(uploaded_file.value.keys())[0] email_body_str = codecs.decode(uploaded_file.value[filename]['content'], encoding="utf-8") b=email.message_from_string(email_body_str) for part in b.walk(): if part.get_content_type(): body = str(part.get_payload()) soup = BeautifulSoup(body) paragraphs = soup.find_all('body') for paragraph in paragraphs: check.append(paragraph.text) file="".join(check) return file def text_clean(x,punct=True): ### Light x = x.lower() # lowercase everything x = x.encode('ascii', 'ignore').decode() # remove unicode characters x = re.sub(r'https*\S+', ' ', x) # remove links x = re.sub(r'http*\S+', ' ', x) # cleaning up text x = re.sub(r'\'\w+', ' ', x) x = re.sub(r'\w*\d+\w*', ' ', x) x = re.sub(r'\s{2,}', ' ', x) x = re.sub(r'\s[^\w\s]\s', ' ', x) ### Heavy x = re.sub(r'@\S', ' ', x) x = re.sub(r'#\S+', ' ', x) x=x.replace('=',' ') if(punct==True): x = re.sub('[%s]' % re.escape(string.punctuation), ' ', x) # remove single letters and numbers surrounded by space x = re.sub(r'\s[a-z]\s|\s[0-9]\s', ' ', x) clean=[' Â\x8a','\t','\n','Ã\x83','Â\x92','Â\x93','Â\x8a','Â\x95'] for y in clean: x=x.replace(y,'') return x ####BERT MODEL LOAD REQUIRMENTS######### class ToneTagger(pl.LightningModule): def __init__(self, n_classes: int, n_training_steps=None, n_warmup_steps=None): super().__init__() self.bert = BertModel.from_pretrained(PARAMS['BERT_MODEL_NAME'], return_dict=True) self.classifier = nn.Linear(self.bert.config.hidden_size, n_classes) self.n_training_steps = n_training_steps self.n_warmup_steps = n_warmup_steps self.criterion = nn.BCELoss() def forward(self, input_ids, attention_mask): output = self.bert(input_ids,attention_mask) output = self.classifier(output.pooler_output) output = torch.sigmoid(output) return output # LOAD IN PRE TRAINED MODEL WITH WEIGHTS model=ToneTagger(8) # load up the model archetecture with 8 different tones model.load_state_dict(torch.load("models/SAMODEL"), strict=False) # populate the weights of the model model.eval() def bert_tones(text_sentences,model): """ This function takes in setences and the model cleaned them then predicts the bert tones""" predictions=[] text=[] tokenizer = BertTokenizer.from_pretrained('google/bert_uncased_L-2_H-128_A-2') for sent in text_sentences: text.append(text_clean(sent,False)) cleaned_text=text_clean(sent) encoding = tokenizer.encode_plus( cleaned_text, add_special_tokens=True, max_length=100, return_token_type_ids=False, padding="max_length", truncation=True, return_attention_mask=True, return_tensors='pt', ) with torch.no_grad(): inputs=encoding['input_ids'] attention=encoding['attention_mask'] pred=model(inputs,attention) pred=pred.cpu().numpy() predictions.append(np.array(pred[0])) return text,predictions def convert_text_to_tone(text,model=model,params=PARAMS): """ This Function will convert the text to tone, it takes in the text with punctuations seperates it into senteces""" data=[] # Find the sentiment from vader sentiment analyzer (Not currently in use) sid_obj = SentimentIntensityAnalyzer() total_cleaned=text_clean(text) sentiment_dict = sid_obj.polarity_scores(total_cleaned)# Find the sentiment from text_sentences=tokenize.sent_tokenize(text) #Find all the different sentences through the NLTK library plain_text,predictions=bert_tones(text_sentences,model) data.append([plain_text,sentiment_dict,predictions]) final=pd.DataFrame(data,columns=['text','sentiment','sentencetone']) # print(final) agg_tones=final['sentencetone'].apply(np.mean,axis=0) tones=pd.DataFrame(agg_tones.tolist(),columns=params['LABEL_COLUMNS']) return final,tones ### This will be abstracted away to a more dynamic model brf='Rate_Models/bounce_rate_model.sav' BRM = pickle.load(open(brf, 'rb')) orf='Rate_Models/open_rate_model.sav' ORM = pickle.load(open(orf, 'rb')) urf='Rate_Models/unsubscribe_rate_model.sav' URM = pickle.load(open(urf, 'rb')) crf='Rate_Models/click_trough_rate_model.sav' CRM = pickle.load(open(crf, 'rb')) CV='Rate_Models/Conversion_rate.sav' ConM = pickle.load(open(CV, 'rb')) CTOR='Rate_Models/Click-To-Open_Rates.sav' CTORM = pickle.load(open(CTOR, 'rb')) RV='Rate_Models/Revenue_per_email.sav' RVM = pickle.load(open(RV, 'rb')) model_dict={'Open_Rate':ORM, 'Click_Through_Rate': CRM, 'Unsubscribe_Rate': URM, 'Bounce_Rate':BRM, 'Click_To_Open_Rate': CTORM, 'Conversion_Rate': ConM, 'Revenue_Per_Email':RVM} ## Plot confidence interval def plot_CI(pred,lower,upper,scale_factor=0.5): """This function plots the confidence intervals of your prediction pred- The prediction varaible given from the Random Forest for the target variable lower- The lower half of the prediction confidence interval upper- The upper half of the confidence interval scale_factor- This will modify the size of the graph """ title=f'The Predicted Value is {pred}' fig = go.Figure() fig.update_xaxes(showgrid=False) fig.update_yaxes(showgrid=False, zeroline=True, zerolinecolor='black', zerolinewidth=3, showticklabels=False) fig.update_layout(height=200, plot_bgcolor='white') fig.add_trace(go.Scatter( x=[pred], y=[0,0], mode='markers', marker_size=10,line=dict(color="red") )) fig.update_layout(xaxis_range=[0,upper+upper*scale_factor]) fig.update_layout(showlegend=False) fig.add_vline(x=lower,annotation_text=f"{lower}",annotation_position="top") fig.add_vline(x=upper,annotation_text=f"{upper}",annotation_position="top") fig.add_vrect(lower,upper,fillcolor='red',opacity=0.25,annotation_text='95% CI',annotation_position="outside top") fig.update_layout(title_text=title, title_x=0.5) fig.show() def find_max_cat(df,target,industry,campaign): #### Select entries with the matching industry and campaign (1 == True) d=df[(df[campaign]==1) & (df[industry]==1)] if(len(d)>0): rec=df.loc[d[target].idxmax()][3:11] ## Select the tone values for the best target values return round(d[target].min(),3),round(d[target].max(),3),rec ## select the top target variable value and return with the tones else: return 0,0,0 def scale_values(val, tn): ## val = slider value, tn = current tone value val = tn*100 return val tone_labels = ['Analytical', 'Casual', 'Confident', 'Friendly', 'Joyful', 'Optimistic', 'Respectful', 'Urgent'] # ## Plot recommendations - ORIGINAL FROM V1.0 # def recommend(tones,recommend_changes,change,target): # ''' This function creates the recomended changes plots it takes it the tones, the changes and ''' # fig = go.Figure() # fig.add_trace(go.Bar( # y=tones.columns, # x=tones.values[0], # name='Current Tones', # orientation='h', # # text=np.round(tones.values[0],3), # width=.9, # marker=dict( # color='#00e6b1', # line=dict(color='rgba(58, 71, 80, 1.0)', width=3) # ) # )) # fig.add_trace(go.Bar( # y=tones.columns, # x=recommend_changes, # name='Recommend changes', # orientation='h', # text=np.round(recommend_changes,3), # width=.5, # marker=dict( # color='#e60f00', # line=dict(color='rgba(58, 71, 80, 1.0)', width=3) # ) # )) # fig.update_traces(textfont_size=18, textposition="outside", cliponaxis=False) # fig.update_layout(height=1000, plot_bgcolor='white') # fig.update_layout(barmode='stack', yaxis={'categoryorder':'array','categoryarray': recommend_changes.sort_values(key=abs,ascending=True).index}) # fig.update_layout(title_text=f'The following Changes will yield a {round(change,3)} increase in {target}') # fig.show() ## Plot recommendations - MODIFIED def recommend(tones,recommend_changes,change,target): ''' This function creates the recomended changes plots it takes it the tones, the changes and ''' fig = go.Figure() fig.add_trace(go.Bar( # y=tones.columns, y=tone_labels, x=recommend_changes, name='Recommend changes', orientation='h', text=np.round(recommend_changes,3), width=.5, marker=dict( color='#e60f00', line=dict(color='rgba(58, 71, 80, 1.0)', width=1) ) )) fig.update_traces(textfont_size=18, textposition="outside", cliponaxis=False) # fig.update_layout(height=1000, plot_bgcolor='white') # fig.update_layout(barmode='stack', yaxis={'categoryorder':'array','categoryarray': recommend_changes.sort_values(key=abs,ascending=True).index}) # fig.update_layout(title_text=f'The following Changes will yield a {round(change,3)} increase in {target}') if target == 'Revenue_Per_Email': out = f"${round(change,2)}" else: out = f"{round(change,2)*100}%" fig.update_layout(title_text=f'The following Changes will yield a {out} increase in {target}') fig.show() def prediction(tones,campaign_val,industry_val,target): model_val=pd.DataFrame(tones,columns=PARAMS['rf_labels']).fillna(0) model_val.loc[0,campaign_val]=1 model_val.loc[0,industry_val]=1 model=model_dict[target] pred=model.predict(model_val)[0] # y_pred = regr.predict(X_test) # r2_test = r2_score(y_test, y_pred) CI=CI_rates[CI_rates['model']==target] lower=pred+CI['2_5'].values[0] higher=pred+CI['97_5'].values[0] return pred,round(lower,3),round(higher,3),model ## Plot recommendations for intensity changes def intensity_changes(tones,recommend_changes,change,target): ''' This function creates a plot to show the change made to intensities and shows the resulting change in target rate ''' fig = go.Figure() fig.add_trace(go.Bar( # y=tones.columns, y=tone_labels, x=recommend_changes, name='Recommend changes', orientation='h', text=np.round(recommend_changes,3), width=.5, marker=dict( color='#00e6b1', line=dict(color='rgba(58, 71, 80, 1.0)', width=1) ) )) fig.update_traces(textfont_size=18, textposition="outside", cliponaxis=False) if change < 0: if target == 'Revenue_Per_Email': out = f"${round(change*(-1),2)}" else: out = f"{round(change*(-1),2)}%" fig.update_layout(title_text=f'The following Changes will decrease the {target} by {out}') elif change >= 0: if target == 'Revenue_Per_Email': out = f"${round(change,2)}" else: out = f"{round(change,2)*100}%" fig.update_layout(title_text=f'The following Changes will increase the {target} by {out}') # fig.update_layout(title_text=f'The changes made to the tone intensities') fig.show() def load_data(): data_location='Tone_and_target.csv' df=pd.read_csv(data_location) df_unique = df.drop_duplicates() df_unique = pd.get_dummies(df_unique, columns=['industry','campaign_type']) df_data=df_unique.drop(columns=['Unnamed: 0','body']) df_data=df_data.rename(columns={'Click-To-Open Rates':'Click_To_Open_Rate','Conversion Rate':'Conversion_Rate','Revenue Per email':'Revenue_Per_Email'}) return df_data def plot_table(sorted_setence_tuple): """ Plots the bottom most table, takes in a list of tuples where the tuple is the sentence the sentiment distance from the best values """ sentences=list(zip(*sorted_setence_tuple))[0] scores= list(zip(*sorted_setence_tuple))[1] colors= list(zip(*sorted_setence_tuple))[2] rbg_list=[] for i in colors: rbg_list.append('rgb'+str(i)) fig = go.Figure(data=[go.Table( header=dict(values=['Sentences', 'Difference from Recommended Tone'], line_color = 'darkslategray', fill_color = '#010405', align = 'center', font=dict(family="Metropolis",color='white', size=16)), cells=dict(values=[sentences, # 1st column scores] , # 2nd column line_color='darkslategray', fill_color=[rbg_list], align=['left','center'], font=dict(family="Arial",size=12))) ]) #fig.show() def corrections(best,df): """This function finds the the difference between the tone of each sentence and the best tone for the desired metric best- tone values of the best email for the current categories df- dataframe of the sentences of the uploaded email and the """ sentence_order=[] colors=['#48f0c9','#6ef5d6','#94f7e1','#bbfaec','#e6fff9','#ffe7e6','#ffc3bf','#ffa099','#ff7c73','#ff584d'] #loxz green primary to Loxz light red for i in range(len(df['sentencetone'][0])): text=df['text'][0][i] cur=df['sentencetone'][0][i] cosine_distance= spatial.distance.cosine(best,cur) distance=cosine_distance # Cosine distance new_value = round(( (distance - 0) / (1 - 0) ) * (100 - 0) + 0) # for distance metric this is just normalizing the varaible color_value=round(( (distance - 0) / (1 - 0) ) * (10 - 0) + 0) # Color whell value col=colors[(color_value)] rbg=ImageColor.getcolor(f'{col}', "RGB") sentence_order.append((text,new_value,rbg)) sorted_sentences=sorted(sentence_order,key=lambda x: x[1],reverse=True) plot_table(sorted_sentences) def read_file(fc): with open(fc.selected) as file: # Use file to refer to the file object data = file.read() check=[] b=email.message_from_string(data) for part in b.walk(): if part.get_content_type(): body = str(part.get_payload()) soup = BeautifulSoup(body) paragraphs = soup.find_all('body') for paragraph in paragraphs: check.append(paragraph.text) file="".join(check) return file