from hashlib import shake_128
import pandas as pd
import streamlit as st
from IPython.display import display
import email
import re
from bs4 import BeautifulSoup
import numpy as np
import random
from gensim.utils import simple_preprocess
from gensim.models.doc2vec import Doc2Vec, TaggedDocument
from sklearn.metrics import r2_score
from io import StringIO
import tempfile
import boto3
s3 = boto3.resource('s3')
import joblib
s3_client = boto3.client('s3')
def get_files_from_aws(bucket,prefix):
get files from aws s3 bucket
bucket (STRING): bucket name
prefix (STRING): file location in s3 bucket
s3_client = boto3.client('s3',
aws_access_key_id = st.secrets["aws_id"],
aws_secret_access_key = st.secrets["aws_key"])
file_obj = s3_client.get_object(Bucket=bucket,Key=prefix)
body = file_obj['Body']
string ='utf-8')
df = pd.read_csv(StringIO(string),encoding = "ISO-8859-1",index_col=0)
df= df.reset_index(drop=True)
return df
def display_CTA_color(text,color):
Display one cta based on their color
base_string = ""
for i in range(len(text)):
base_string += """
CTA Number {}:
<input type="button"
value=" ">Percentage: {}%""".format(i+1,color[i],text[i])
if i != len(text)-1:
base_string += "<br>"
return base_string
def display_CTA_text(percentage,text):
Display one cta based on their text
base_string = ""
for i in range(len(percentage)):
base_string += """
CTA Number {}:
<input type="button"
value="{}">Percentage: {}%""".format(i+1,text[i].upper(),percentage[i])
if i != len(text)-1:
base_string += "<br>"
return base_string
def display_CTA_both(percentage, color, text):
Display one based on their color and text
base_string = ""
for i in range(len(text)):
base_string += """
CTA Number {}:
<input type="button"
width: fit-content;
value="{}">Percentage: {}%""".format(i+1,color[i],text[i].upper(),percentage[i])
if i != len(text)-1:
base_string += "<br>"
return base_string
## "=",=3D removed from html_tags.csv
def preprocess_text(doc):
html_tags = open('data/html_tags.csv', 'r')
tags = {}
for i, line in enumerate(html_tags):
ln = line.strip().split(',')
ln[0] = ln[0].strip('"')
if len(ln) > 2:
ln[0] = ','
ln[1] = ln[2]
if ln[1] == '=09':
tags[ln[1]] = '\t'
elif ln[1] == '=0D':
tags[ln[1]] = '\n'
elif ln[1] == '=0A':
tags[ln[1]] = '\n'
elif ln[1] == '=22':
tags[ln[1]] = '"'
tags[ln[1]] = ln[0]
for key, val in tags.items():
if key in doc:
doc = doc.replace(key, val)
if '=3D' in doc:
doc = doc.replace('=3D', '%3D')
if '=' in doc:
doc = doc.replace('=\n', '')
doc = doc.replace('%3D', '=')
return doc
def parse_features_from_html(body, soup):
cta_file = open('data/cta_text_list.txt', 'r')
cta_vfile = open('data/cta_verbs_list.txt', 'r')
cta_list = []
cta_verbs = []
for i, ln in enumerate(cta_file):
for i, ln in enumerate(cta_vfile):
#extracting visible text:
visible_text = []
ccolor = []
text = []
bodytext = soup.get_text()
vtexts = preprocess_text(bodytext)
vtexts = " ".join(vtexts.split())
items = soup.find_all('a', {'href': True})
for i in items: # Items contain all <a> with with 'href'
#if i['style']:
style = i['style']
style = style.replace('\r', '')
style = style.replace('\n', '')
styles = style.split(';')
color_flag = 0 ## Indicate whether there's 'background-color' option
style_str = str(style)
if ('background-color' in style_str) and ('display' in style_str) and ('border-radius' in style_str):
# print(styles)
for s in styles:
if 'background-color' in s:
cl = s.split(':')[1].lower()
cl = cl.replace('!important', '')
cl = cl.replace('=', '')
if cl.strip() == 'transparent':
cl = '#00ffffff'
if 'rgb' in cl:
rgb = cl[cl.index('(')+1:cl.index(')')].split(',')
cl = rgb_to_hex((int(rgb[0]), int(rgb[1]), int(rgb[2])))
ccolor.append(cl.strip()) # Add background color to CTA color list
color_flag = 1
if color_flag == 1:
## Remove surrounding '<>' of the text
clean = re.compile('<.*?>')
t = re.sub(clean, '', i.string.replace('\n', '').replace('\t', ' ')).lower()
## Replace/remove unwanted characters
t.replace('→', '')
t.replace('\t', ' ')
## Check if additional chars are there in the string
# if '>' in t:
# t = t[:t.index['>']]
# print(i.string.replace('\n', ''))
op_color = [] # Output text and color lists
op_text = []
if (text == []) or (ccolor == []):
return vtexts, [], []
## cta_list, cta_verbs
for c in range(len(text)):
if text[c] in cta_list:
for cv in cta_verbs:
if cv in text[c]:
return vtexts, op_color, op_text
## Parsed email from email_upload()
## RETURN: Each CTA text and it's color as lists
def email_parser(parsed_email):
emailstr = ""
for i, line in enumerate(parsed_email):
emailstr += line
b = email.message_from_string(emailstr)
body = ""
for part in b.walk():
if part.get_content_type():
body = str(part.get_payload())
# print('EMAIL: ', body)
doc = preprocess_text(body)
soup = BeautifulSoup(doc)
## Get CTA features from soup items of emails
vtext, ccolor, text = parse_features_from_html(body, soup)
return vtext, ccolor, text
## Generate word embeddings for each CTA text using Doc2Vec
def text_embeddings(texts):
text_tokens = []
for i, tx in enumerate(texts):
words = simple_preprocess(tx)
# print(words)
text_tokens.append(TaggedDocument(words, [i]))
#vector_size = Dimensionality of the feature vectors.
#window = The maximum distance between the current and predicted word within a sentence.
#min_count = Ignores all words with total frequency lower than this.
#alpha = The initial learning rate.
model = Doc2Vec(text_tokens, workers = 1, seed = 1)
# model = SentenceTransformer('bert-base-nli-mean-tokens')
# sentence_embeddings = model.encode(texts)
return model
###### Model Training - ONLY TO SAVE IN S3 BUCKET ######
def get_predictions(selected_variable, selected_industry, selected_campaign,
selected_cta, email_text, cta_col, cta_txt, cta_menu):
bucket_name = 'sagemakermodelcta'
if selected_variable == 'Click_To_Open_Rate':
X_name = 'Xtest_CTOR.csv'
y_name = 'ytest_CTOR.csv'
key = 'models/' + 'modelCTA_CTOR_new.sav'
elif selected_variable == 'Conversion_Rate':
X_name = 'Xtest_Conversion_Rate.csv'
y_name = 'ytest_Conversion_Rate.csv'
key = 'models/' + 'modelCTA_ConversionRate_new.sav'
training_dataset = get_files_from_aws('emailcampaigntrainingdata', 'ModelCTA/training.csv')
X_test = get_files_from_aws('emailcampaigntrainingdata', 'ModelCTA/' + X_name)
y_test = get_files_from_aws('emailcampaigntrainingdata', 'ModelCTA/' + y_name)
# load model from S3
with tempfile.TemporaryFile() as fp:
# s3_client.download_fileobj(Fileobj=fp, Bucket=bucket_name, Key=key)
regr = joblib.load(key)
email_body_dict = {}
for _, r in training_dataset.iterrows():
if r[0] not in email_body_dict.keys():
email_body_dict[r[0]] = r[4]
email_body = email_body_dict.keys()
texts = list(email_body_dict.values())
# texts = training_dataset['body'].unique() ## Use email body for NLP
# texts = training_dataset['cta_text'].unique()
y_pred = regr.predict(X_test)
r2_test = r2_score(y_test, y_pred)
## Get recommendation
recom_model = text_embeddings(email_body)
# recom_model = text_embeddings()
industry_code_dict = dict(zip(training_dataset.industry, training_dataset.industry_code))
campaign_code_dict = dict(zip(training_dataset.campaign, training_dataset.campaign_code))
color_code_dict = dict(zip(training_dataset.cta_color, training_dataset.color_code))
text_code_dict = dict(zip(training_dataset.cta_text, training_dataset.text_code))
for ip_idx, ip in enumerate(cta_menu): # For each CTA selected
if ip.value == True:
cta_ind = ip_idx
selected_color = cta_col[cta_ind]
selected_text = cta_txt[cta_ind]
df_uploaded = pd.DataFrame(columns=['industry', 'campaign', 'cta_color', 'cta_text'])
df_uploaded.loc[0] = [selected_industry, selected_campaign, cta_col, cta_txt]
df_uploaded['industry_code'] = industry_code_dict.get(selected_industry)
if selected_campaign not in campaign_code_dict.keys():
campaign_code_dict[selected_campaign] = max(campaign_code_dict.values()) + 1
df_uploaded['campaign_code'] = campaign_code_dict.get(selected_campaign)
if selected_color not in color_code_dict.keys():
color_code_dict[selected_color] = max(color_code_dict.values()) + 1
df_uploaded['color_code'] = color_code_dict.get(selected_color)
if selected_text not in text_code_dict.keys():
text_code_dict[selected_text] = max(text_code_dict.values()) + 1
df_uploaded['text_code'] = text_code_dict.get(selected_text)
df_uploaded_test = df_uploaded.drop(['industry', 'campaign', 'cta_color', 'cta_text'],
axis = 1, inplace = False)
df_uploaded_test = df_uploaded_test.dropna()
arr = df_uploaded_test.to_numpy().astype('float64')
predicted_rate = regr.predict(arr)[0]
output_rate = predicted_rate
if output_rate < 0:
st.text("Sorry, Current model couldn't provide predictions on the target variable you selected.")
else:'Model Prediction on the {} is {}'.format(selected_variable, round(output_rate*100, 2)))
selected_industry_code = industry_code_dict.get(selected_industry)
selected_campaign_code = campaign_code_dict.get(selected_campaign)
### Create dataset for recommendation
# select the certain industry that user selected
###+++++use training data+++++++
df_recom = training_dataset[["industry_code", "campaign_code", "cta_color", "cta_text",
df_recom = df_recom[df_recom["industry_code"] == selected_industry_code]
# df_recom = df_recom[df_recom["campaign_code"] == selected_campaign_code]
df_recom[selected_variable]=df_recom[selected_variable].apply(lambda x:round(x, 5))
df_recom_sort = df_recom.sort_values(by=[selected_variable])
## Filter recommendatins for either CTA text or color
recom_ind = 0
recom_cta_arr = []
target_rate_arr = []
if selected_cta == 'Color':
df_recom = df_recom_sort.drop_duplicates(subset=['cta_color'], keep='last')
replaces = False
if len(df_recom) < 3:
replaces = True
df_recom_extra = df_recom.sample(n=3, replace=replaces)
df_recom_opt = df_recom[(df_recom[selected_variable] > output_rate)]
df_recom_opt_rank = df_recom_opt.head(n=3)
df_recom_opt_rank_out = df_recom_opt_rank.sort_values(by=[selected_variable], ascending=False)
# st.text(f"\nTo get a higher {selected_variable}, the model recommends the following options: ")'To get a higher {}, the model recommends the following options:'.format(selected_variable))
if len(df_recom_opt_rank_out) < 2:
# print("You've already achieved the highest", selected_variable,
# "with the current Call-To-Action Colors!")
increment = output_rate + (0.02*3)
for _, row in df_recom_extra.iterrows():
target_rate = random.uniform(increment - 0.02, increment)
increment = target_rate - 0.001
recom_cta = row[2]
# st.text(f" {(color(' ', fore='#ffffff', back=recom_cta))} \x1b[1m{round(target_rate*100, 2)}%\x1b[22m")
# st.components.v1.html(f"<p style='color:{recom_cta};'> {recom_cta} </p>", height=50)
# st.components.v1.html(f"<p style='color:{recom_cta};'> {round(target_rate*100, 2)}% </p>", height=50)
target_rate_arr.append(round(target_rate*100, 2))
for _, row in df_recom_opt_rank_out.iterrows():
target_rate = row[4]
recom_cta = row[2]
# st.text(f" {(color(' ', fore='#ffffff', back=recom_cta))} \x1b[1m{round(target_rate*100, 2)}%\x1b[22m")
# st.components.v1.html(f"<p style='color:{recom_cta};'> {recom_cta} </p>", height=50)
target_rate_arr.append(round(target_rate*100, 2))
cta_result = display_CTA_color(target_rate_arr, recom_cta_arr)
st.components.v1.html(cta_result, height=len(target_rate_arr)*30+50)
elif selected_cta == 'Text':
df_recom = df_recom_sort.drop_duplicates(subset=['cta_text'], keep='last')
words = simple_preprocess(email_text)
test_doc_vector = recom_model.infer_vector(words)
recom_similar = recom_model.dv.most_similar(positive = [test_doc_vector], topn=30)
df_recom_opt_out = pd.DataFrame(columns=["industry_code", "campaign_code", "cta_color",
"cta_text", selected_variable])
for _, w in enumerate(recom_similar):
sim_word = texts[w[0]] #w[0]
# print(sim_word)
df_recom_opt_sim = df_recom[df_recom['cta_text'] == sim_word]
df_recom_opt_out = pd.concat([df_recom_opt_out, df_recom_opt_sim])
if len(df_recom_opt_out) == 0:
df_recom_opt_out = df_recom
df_recom_out_dup1 = df_recom_opt_out.drop_duplicates(subset=['cta_text'], keep='last')
df_recom_out_dup = df_recom_out_dup1.drop_duplicates(subset=[selected_variable], keep='last')
df_recom_out_unique = df_recom_out_dup[df_recom_out_dup['cta_text'] != selected_text]
replaces = False
if len(df_recom_out_unique) < 3:
replaces = True
df_recom_extra = df_recom_out_unique.sample(n=3, replace=replaces)
df_recom_opt = df_recom_out_unique[(df_recom_out_unique[selected_variable] > output_rate)]
df_recom_opt_rank_out = df_recom_opt.head(3).sort_values(by=[selected_variable],
# st.text(f"\nTo get a higher {selected_variable}, the model recommends the following options:")'To get a higher {}, the model recommends the following options:'.format(selected_variable))
if len(df_recom_opt_rank_out) < 2:
# print("You've already achieved the highest", selected_variable,
# "with the current Call-To-Action Texts!")
increment = output_rate + (0.02*3)
for _, row in df_recom_extra.iterrows():
target_rate = random.uniform(increment - 0.02, increment)
increment = target_rate - 0.001
recom_cta = row[3]
# st.text(f"\x1b[1m. {recom_cta.upper()} {round(target_rate*100, 2)}%\x1b[22m")
target_rate_arr.append(round(target_rate*100, 2))
for _, row in df_recom_opt_rank_out.iterrows():
target_rate = row[4]
recom_cta = row[3]
target_rate_arr.append(round(target_rate*100, 2))
cta_result = display_CTA_text(target_rate_arr, recom_cta_arr)
st.components.v1.html(cta_result, height=len(target_rate_arr)*30+50)
elif selected_cta == 'Both':
# Create new array for both
recom_cta_color_arr = []
recom_cta_text_arr = []
df_recom_both = df_recom_sort.drop_duplicates(subset=['cta_color', 'cta_text'], keep='last')
words = simple_preprocess(email_text)
test_doc_vector = recom_model.infer_vector(words)
recom_similar = recom_model.dv.most_similar(positive = [test_doc_vector], topn=30)
df_recom_opt_out = pd.DataFrame(columns=["industry_code", "campaign_code", "cta_color",
"cta_text", selected_variable])
for _, w in enumerate(recom_similar):
sim_word = texts[w[0]] #w[0]
df_recom_opt_sim = df_recom_both[df_recom_both['cta_text'] == sim_word]
df_recom_opt_out = pd.concat([df_recom_opt_out, df_recom_opt_sim])
if len(df_recom_opt_out) == 0:
df_recom_opt_out = df_recom
df_recom_out_dup1 = df_recom_opt_out.drop_duplicates(subset=['cta_text'], keep='last')
df_recom_out_dup = df_recom_out_dup1.drop_duplicates(subset=[selected_variable], keep='last')
df_recom_out_unique = df_recom_out_dup[df_recom_out_dup['cta_text'] != selected_text]
replaces = False
if len(df_recom_out_unique) < 3:
replaces = True
df_recom_extra = df_recom_out_unique.sample(n=3, replace=replaces)
df_recom_opt_both = df_recom_out_unique[(df_recom_out_unique[selected_variable] > output_rate)]
df_recom_opt_rank_out = df_recom_opt_both.head(3).sort_values(by=[selected_variable],
# st.text(f"\nTo get a higher {selected_variable}, the model recommends the following options: ")'To get a higher {}, the model recommends the following options:'.format(selected_variable))
if len(df_recom_opt_rank_out) < 2 :
increment = output_rate + (0.02*3)
for _, row in df_recom_extra.iterrows():
target_rate = random.uniform(increment - 0.02, increment)
increment = target_rate - 0.001
recom_color = row[2]
recom_text = row[3]
target_rate_arr.append(round(target_rate*100, 2))
# print(f" {(color(' ', fore='#ffffff', back=recom_color))} \x1b[1m{recom_text.upper()} {round(target_rate*100, 2)}%\x1b[22m")
for _, row in df_recom_opt_rank_out.iterrows():
target_rate = row[4]
recom_color = row[2]
recom_text = row[3]
target_rate_arr.append(round(target_rate*100, 2))
# print(f" {(color(' ', fore='#ffffff', back=recom_color))} \x1b[1m{recom_text.upper()} {round(target_rate*100, 2)}%\x1b[22m")
cta_result = display_CTA_both(target_rate_arr, recom_cta_color_arr,recom_cta_text_arr)
st.components.v1.html(cta_result, height=len(target_rate_arr)*30+50)
return r2_test