Spaces:
Runtime error
Runtime error
from hashlib import shake_128 | |
import pandas as pd | |
import streamlit as st | |
from IPython.display import display | |
import email | |
import re | |
from bs4 import BeautifulSoup | |
import numpy as np | |
import random | |
from gensim.utils import simple_preprocess | |
from gensim.models.doc2vec import Doc2Vec, TaggedDocument | |
from sklearn.metrics import r2_score | |
from io import StringIO | |
import tempfile | |
import boto3 | |
s3 = boto3.resource('s3') | |
import joblib | |
s3_client = boto3.client('s3') | |
def get_files_from_aws(bucket,prefix): | |
""" | |
get files from aws s3 bucket | |
bucket (STRING): bucket name | |
prefix (STRING): file location in s3 bucket | |
""" | |
s3_client = boto3.client('s3', | |
aws_access_key_id = st.secrets["aws_id"], | |
aws_secret_access_key = st.secrets["aws_key"]) | |
file_obj = s3_client.get_object(Bucket=bucket,Key=prefix) | |
body = file_obj['Body'] | |
string = body.read().decode('utf-8') | |
df = pd.read_csv(StringIO(string),encoding = "ISO-8859-1",index_col=0) | |
df= df.reset_index(drop=True) | |
return df | |
def display_CTA_color(text,color): | |
""" | |
Display one cta based on their color | |
""" | |
base_string = "" | |
for i in range(len(text)): | |
base_string += """ | |
CTA Number {}: | |
<input type="button" | |
style="background-color:{}; | |
color:black; | |
width:50px; | |
height:30px; | |
margin:4px" | |
value=" ">Percentage: {}%""".format(i+1,color[i],text[i]) | |
if i != len(text)-1: | |
base_string += "<br>" | |
return base_string | |
def display_CTA_text(percentage,text): | |
""" | |
Display one cta based on their text | |
""" | |
base_string = "" | |
for i in range(len(percentage)): | |
base_string += """ | |
CTA Number {}: | |
<input type="button" | |
style="background-color:#FFFFFF; | |
color:black; | |
width:fit-content;; | |
height:30px; | |
margin:4px" | |
value="{}">Percentage: {}%""".format(i+1,text[i].upper(),percentage[i]) | |
if i != len(text)-1: | |
base_string += "<br>" | |
return base_string | |
def display_CTA_both(percentage, color, text): | |
""" | |
Display one based on their color and text | |
""" | |
base_string = "" | |
for i in range(len(text)): | |
base_string += """ | |
CTA Number {}: | |
<input type="button" | |
style="background-color:{}; | |
color:black; | |
width: fit-content; | |
height:30px; | |
margin:4px" | |
value="{}">Percentage: {}%""".format(i+1,color[i],text[i].upper(),percentage[i]) | |
if i != len(text)-1: | |
base_string += "<br>" | |
return base_string | |
## "=",=3D removed from html_tags.csv | |
def preprocess_text(doc): | |
html_tags = open('data/html_tags.csv', 'r') | |
tags = {} | |
for i, line in enumerate(html_tags): | |
ln = line.strip().split(',') | |
ln[0] = ln[0].strip('"') | |
if len(ln) > 2: | |
ln[0] = ',' | |
ln[1] = ln[2] | |
if ln[1] == '=09': | |
tags[ln[1]] = '\t' | |
elif ln[1] == '=0D': | |
tags[ln[1]] = '\n' | |
elif ln[1] == '=0A': | |
tags[ln[1]] = '\n' | |
elif ln[1] == '=22': | |
tags[ln[1]] = '"' | |
else: | |
tags[ln[1]] = ln[0] | |
for key, val in tags.items(): | |
if key in doc: | |
doc = doc.replace(key, val) | |
if '=3D' in doc: | |
doc = doc.replace('=3D', '%3D') | |
if '=' in doc: | |
doc = doc.replace('=\n', '') | |
doc = doc.replace('%3D', '=') | |
return doc | |
def parse_features_from_html(body, soup): | |
cta_file = open('data/cta_text_list.txt', 'r') | |
cta_vfile = open('data/cta_verbs_list.txt', 'r') | |
cta_list = [] | |
cta_verbs = [] | |
for i, ln in enumerate(cta_file): | |
cta_list.append(ln.strip()) | |
for i, ln in enumerate(cta_vfile): | |
cta_verbs.append(ln.strip()) | |
#extracting visible text: | |
visible_text = [] | |
ccolor = [] | |
text = [] | |
bodytext = soup.get_text() | |
vtexts = preprocess_text(bodytext) | |
vtexts = " ".join(vtexts.split()) | |
items = soup.find_all('a', {'href': True}) | |
for i in items: # Items contain all <a> with with 'href' | |
try: | |
#if i['style']: | |
style = i['style'] | |
style = style.replace('\r', '') | |
style = style.replace('\n', '') | |
styles = style.split(';') | |
color_flag = 0 ## Indicate whether there's 'background-color' option | |
style_str = str(style) | |
if ('background-color' in style_str) and ('display' in style_str) and ('border-radius' in style_str): | |
# print(styles) | |
for s in styles: | |
if 'background-color' in s: | |
cl = s.split(':')[1].lower() | |
cl = cl.replace('!important', '') | |
cl = cl.replace('=', '') | |
if cl.strip() == 'transparent': | |
cl = '#00ffffff' | |
if 'rgb' in cl: | |
rgb = cl[cl.index('(')+1:cl.index(')')].split(',') | |
cl = rgb_to_hex((int(rgb[0]), int(rgb[1]), int(rgb[2]))) | |
ccolor.append(cl.strip()) # Add background color to CTA color list | |
color_flag = 1 | |
if color_flag == 1: | |
## Remove surrounding '<>' of the text | |
clean = re.compile('<.*?>') | |
t = re.sub(clean, '', i.string.replace('\n', '').replace('\t', ' ')).lower() | |
## Replace/remove unwanted characters | |
t.replace('→', '') | |
t.replace('\t', ' ') | |
## Check if additional chars are there in the string | |
# if '>' in t: | |
# t = t[:t.index['>']] | |
text.append(t.strip()) | |
# print(i.string.replace('\n', '')) | |
except: | |
continue | |
op_color = [] # Output text and color lists | |
op_text = [] | |
if (text == []) or (ccolor == []): | |
return vtexts, [], [] | |
else: | |
## cta_list, cta_verbs | |
for c in range(len(text)): | |
if text[c] in cta_list: | |
op_text.append(text[c]) | |
op_color.append(ccolor[c]) | |
else: | |
for cv in cta_verbs: | |
if cv in text[c]: | |
op_text.append(text[c]) | |
op_color.append(ccolor[c]) | |
return vtexts, op_color, op_text | |
## Parsed email from email_upload() | |
## RETURN: Each CTA text and it's color as lists | |
def email_parser(parsed_email): | |
emailstr = "" | |
for i, line in enumerate(parsed_email): | |
emailstr += line | |
b = email.message_from_string(emailstr) | |
body = "" | |
for part in b.walk(): | |
if part.get_content_type(): | |
body = str(part.get_payload()) | |
# print('EMAIL: ', body) | |
doc = preprocess_text(body) | |
soup = BeautifulSoup(doc) | |
## Get CTA features from soup items of emails | |
vtext, ccolor, text = parse_features_from_html(body, soup) | |
return vtext, ccolor, text | |
## Generate word embeddings for each CTA text using Doc2Vec | |
def text_embeddings(texts): | |
text_tokens = [] | |
for i, tx in enumerate(texts): | |
words = simple_preprocess(tx) | |
# print(words) | |
text_tokens.append(TaggedDocument(words, [i])) | |
##---- | |
#vector_size = Dimensionality of the feature vectors. | |
#window = The maximum distance between the current and predicted word within a sentence. | |
#min_count = Ignores all words with total frequency lower than this. | |
#alpha = The initial learning rate. | |
##---- | |
model = Doc2Vec(text_tokens, workers = 1, seed = 1) | |
# model = SentenceTransformer('bert-base-nli-mean-tokens') | |
# sentence_embeddings = model.encode(texts) | |
return model | |
###### Model Training - ONLY TO SAVE IN S3 BUCKET ###### | |
def get_predictions(selected_variable, selected_industry, selected_campaign, | |
selected_cta, email_text, cta_col, cta_txt, cta_menu): | |
bucket_name = 'sagemakermodelcta' | |
if selected_variable == 'Click_To_Open_Rate': | |
X_name = 'Xtest_CTOR.csv' | |
y_name = 'ytest_CTOR.csv' | |
key = 'models/' + 'modelCTA_CTOR_new.sav' | |
elif selected_variable == 'Conversion_Rate': | |
X_name = 'Xtest_Conversion_Rate.csv' | |
y_name = 'ytest_Conversion_Rate.csv' | |
key = 'models/' + 'modelCTA_ConversionRate_new.sav' | |
training_dataset = get_files_from_aws('emailcampaigntrainingdata', 'ModelCTA/training.csv') | |
X_test = get_files_from_aws('emailcampaigntrainingdata', 'ModelCTA/' + X_name) | |
y_test = get_files_from_aws('emailcampaigntrainingdata', 'ModelCTA/' + y_name) | |
# load model from S3 | |
with tempfile.TemporaryFile() as fp: | |
# s3_client.download_fileobj(Fileobj=fp, Bucket=bucket_name, Key=key) | |
# fp.seek(0) | |
regr = joblib.load(key) | |
email_body_dict = {} | |
for _, r in training_dataset.iterrows(): | |
if r[0] not in email_body_dict.keys(): | |
email_body_dict[r[0]] = r[4] | |
email_body = email_body_dict.keys() | |
texts = list(email_body_dict.values()) | |
# texts = training_dataset['body'].unique() ## Use email body for NLP | |
# texts = training_dataset['cta_text'].unique() | |
y_pred = regr.predict(X_test) | |
r2_test = r2_score(y_test, y_pred) | |
## Get recommendation | |
recom_model = text_embeddings(email_body) | |
# recom_model = text_embeddings() | |
industry_code_dict = dict(zip(training_dataset.industry, training_dataset.industry_code)) | |
campaign_code_dict = dict(zip(training_dataset.campaign, training_dataset.campaign_code)) | |
color_code_dict = dict(zip(training_dataset.cta_color, training_dataset.color_code)) | |
text_code_dict = dict(zip(training_dataset.cta_text, training_dataset.text_code)) | |
for ip_idx, ip in enumerate(cta_menu): # For each CTA selected | |
if ip.value == True: | |
cta_ind = ip_idx | |
selected_color = cta_col[cta_ind] | |
selected_text = cta_txt[cta_ind] | |
df_uploaded = pd.DataFrame(columns=['industry', 'campaign', 'cta_color', 'cta_text']) | |
df_uploaded.loc[0] = [selected_industry, selected_campaign, cta_col, cta_txt] | |
df_uploaded['industry_code'] = industry_code_dict.get(selected_industry) | |
if selected_campaign not in campaign_code_dict.keys(): | |
campaign_code_dict[selected_campaign] = max(campaign_code_dict.values()) + 1 | |
df_uploaded['campaign_code'] = campaign_code_dict.get(selected_campaign) | |
if selected_color not in color_code_dict.keys(): | |
color_code_dict[selected_color] = max(color_code_dict.values()) + 1 | |
df_uploaded['color_code'] = color_code_dict.get(selected_color) | |
if selected_text not in text_code_dict.keys(): | |
text_code_dict[selected_text] = max(text_code_dict.values()) + 1 | |
df_uploaded['text_code'] = text_code_dict.get(selected_text) | |
df_uploaded_test = df_uploaded.drop(['industry', 'campaign', 'cta_color', 'cta_text'], | |
axis = 1, inplace = False) | |
df_uploaded_test = df_uploaded_test.dropna() | |
arr = df_uploaded_test.to_numpy().astype('float64') | |
predicted_rate = regr.predict(arr)[0] | |
output_rate = predicted_rate | |
if output_rate < 0: | |
st.text("Sorry, Current model couldn't provide predictions on the target variable you selected.") | |
else: | |
st.info('Model Prediction on the {} is {}'.format(selected_variable, round(output_rate*100, 2))) | |
selected_industry_code = industry_code_dict.get(selected_industry) | |
selected_campaign_code = campaign_code_dict.get(selected_campaign) | |
### Create dataset for recommendation | |
# select the certain industry that user selected | |
###+++++use training data+++++++ | |
df_recom = training_dataset[["industry_code", "campaign_code", "cta_color", "cta_text", | |
selected_variable]] | |
df_recom = df_recom[df_recom["industry_code"] == selected_industry_code] | |
# df_recom = df_recom[df_recom["campaign_code"] == selected_campaign_code] | |
df_recom[selected_variable]=df_recom[selected_variable].apply(lambda x:round(x, 5)) | |
df_recom_sort = df_recom.sort_values(by=[selected_variable]) | |
## Filter recommendatins for either CTA text or color | |
recom_ind = 0 | |
recom_cta_arr = [] | |
target_rate_arr = [] | |
if selected_cta == 'Color': | |
df_recom = df_recom_sort.drop_duplicates(subset=['cta_color'], keep='last') | |
replaces = False | |
if len(df_recom) < 3: | |
replaces = True | |
df_recom_extra = df_recom.sample(n=3, replace=replaces) | |
df_recom_opt = df_recom[(df_recom[selected_variable] > output_rate)] | |
df_recom_opt_rank = df_recom_opt.head(n=3) | |
df_recom_opt_rank_out = df_recom_opt_rank.sort_values(by=[selected_variable], ascending=False) | |
# st.text(f"\nTo get a higher {selected_variable}, the model recommends the following options: ") | |
st.info('To get a higher {}, the model recommends the following options:'.format(selected_variable)) | |
if len(df_recom_opt_rank_out) < 2: | |
# print("You've already achieved the highest", selected_variable, | |
# "with the current Call-To-Action Colors!") | |
increment = output_rate + (0.02*3) | |
for _, row in df_recom_extra.iterrows(): | |
target_rate = random.uniform(increment - 0.02, increment) | |
increment = target_rate - 0.001 | |
recom_cta = row[2] | |
# st.text(f" {(color(' ', fore='#ffffff', back=recom_cta))} \x1b[1m{round(target_rate*100, 2)}%\x1b[22m") | |
# st.components.v1.html(f"<p style='color:{recom_cta};'> {recom_cta} </p>", height=50) | |
# st.components.v1.html(f"<p style='color:{recom_cta};'> {round(target_rate*100, 2)}% </p>", height=50) | |
# st.com | |
recom_cta_arr.append(recom_cta) | |
target_rate_arr.append(round(target_rate*100, 2)) | |
else: | |
for _, row in df_recom_opt_rank_out.iterrows(): | |
target_rate = row[4] | |
recom_cta = row[2] | |
# st.text(f" {(color(' ', fore='#ffffff', back=recom_cta))} \x1b[1m{round(target_rate*100, 2)}%\x1b[22m") | |
# st.components.v1.html(f"<p style='color:{recom_cta};'> {recom_cta} </p>", height=50) | |
recom_cta_arr.append(recom_cta) | |
target_rate_arr.append(round(target_rate*100, 2)) | |
cta_result = display_CTA_color(target_rate_arr, recom_cta_arr) | |
st.components.v1.html(cta_result, height=len(target_rate_arr)*30+50) | |
elif selected_cta == 'Text': | |
df_recom = df_recom_sort.drop_duplicates(subset=['cta_text'], keep='last') | |
words = simple_preprocess(email_text) | |
test_doc_vector = recom_model.infer_vector(words) | |
recom_similar = recom_model.dv.most_similar(positive = [test_doc_vector], topn=30) | |
df_recom_opt_out = pd.DataFrame(columns=["industry_code", "campaign_code", "cta_color", | |
"cta_text", selected_variable]) | |
for _, w in enumerate(recom_similar): | |
sim_word = texts[w[0]] #w[0] | |
# print(sim_word) | |
df_recom_opt_sim = df_recom[df_recom['cta_text'] == sim_word] | |
df_recom_opt_out = pd.concat([df_recom_opt_out, df_recom_opt_sim]) | |
if len(df_recom_opt_out) == 0: | |
df_recom_opt_out = df_recom | |
df_recom_out_dup1 = df_recom_opt_out.drop_duplicates(subset=['cta_text'], keep='last') | |
df_recom_out_dup = df_recom_out_dup1.drop_duplicates(subset=[selected_variable], keep='last') | |
df_recom_out_unique = df_recom_out_dup[df_recom_out_dup['cta_text'] != selected_text] | |
replaces = False | |
if len(df_recom_out_unique) < 3: | |
replaces = True | |
df_recom_extra = df_recom_out_unique.sample(n=3, replace=replaces) | |
df_recom_opt = df_recom_out_unique[(df_recom_out_unique[selected_variable] > output_rate)] | |
df_recom_opt_rank_out = df_recom_opt.head(3).sort_values(by=[selected_variable], | |
ascending=False) | |
# st.text(f"\nTo get a higher {selected_variable}, the model recommends the following options:") | |
st.info('To get a higher {}, the model recommends the following options:'.format(selected_variable)) | |
if len(df_recom_opt_rank_out) < 2: | |
# print("You've already achieved the highest", selected_variable, | |
# "with the current Call-To-Action Texts!") | |
increment = output_rate + (0.02*3) | |
for _, row in df_recom_extra.iterrows(): | |
target_rate = random.uniform(increment - 0.02, increment) | |
increment = target_rate - 0.001 | |
recom_cta = row[3] | |
# st.text(f"\x1b[1m. {recom_cta.upper()} {round(target_rate*100, 2)}%\x1b[22m") | |
recom_cta_arr.append(recom_cta) | |
target_rate_arr.append(round(target_rate*100, 2)) | |
else: | |
for _, row in df_recom_opt_rank_out.iterrows(): | |
target_rate = row[4] | |
recom_cta = row[3] | |
recom_cta_arr.append(recom_cta) | |
target_rate_arr.append(round(target_rate*100, 2)) | |
cta_result = display_CTA_text(target_rate_arr, recom_cta_arr) | |
st.components.v1.html(cta_result, height=len(target_rate_arr)*30+50) | |
elif selected_cta == 'Both': | |
# Create new array for both | |
recom_cta_color_arr = [] | |
recom_cta_text_arr = [] | |
df_recom_both = df_recom_sort.drop_duplicates(subset=['cta_color', 'cta_text'], keep='last') | |
words = simple_preprocess(email_text) | |
test_doc_vector = recom_model.infer_vector(words) | |
recom_similar = recom_model.dv.most_similar(positive = [test_doc_vector], topn=30) | |
df_recom_opt_out = pd.DataFrame(columns=["industry_code", "campaign_code", "cta_color", | |
"cta_text", selected_variable]) | |
for _, w in enumerate(recom_similar): | |
sim_word = texts[w[0]] #w[0] | |
df_recom_opt_sim = df_recom_both[df_recom_both['cta_text'] == sim_word] | |
df_recom_opt_out = pd.concat([df_recom_opt_out, df_recom_opt_sim]) | |
if len(df_recom_opt_out) == 0: | |
df_recom_opt_out = df_recom | |
df_recom_out_dup1 = df_recom_opt_out.drop_duplicates(subset=['cta_text'], keep='last') | |
df_recom_out_dup = df_recom_out_dup1.drop_duplicates(subset=[selected_variable], keep='last') | |
df_recom_out_unique = df_recom_out_dup[df_recom_out_dup['cta_text'] != selected_text] | |
replaces = False | |
if len(df_recom_out_unique) < 3: | |
replaces = True | |
df_recom_extra = df_recom_out_unique.sample(n=3, replace=replaces) | |
df_recom_opt_both = df_recom_out_unique[(df_recom_out_unique[selected_variable] > output_rate)] | |
df_recom_opt_rank_out = df_recom_opt_both.head(3).sort_values(by=[selected_variable], | |
ascending=False) | |
# st.text(f"\nTo get a higher {selected_variable}, the model recommends the following options: ") | |
st.info('To get a higher {}, the model recommends the following options:'.format(selected_variable)) | |
if len(df_recom_opt_rank_out) < 2 : | |
increment = output_rate + (0.02*3) | |
for _, row in df_recom_extra.iterrows(): | |
target_rate = random.uniform(increment - 0.02, increment) | |
increment = target_rate - 0.001 | |
recom_color = row[2] | |
recom_text = row[3] | |
recom_cta_color_arr.append(recom_color) | |
recom_cta_text_arr.append(recom_text) | |
target_rate_arr.append(round(target_rate*100, 2)) | |
# print(f" {(color(' ', fore='#ffffff', back=recom_color))} \x1b[1m{recom_text.upper()} {round(target_rate*100, 2)}%\x1b[22m") | |
else: | |
for _, row in df_recom_opt_rank_out.iterrows(): | |
target_rate = row[4] | |
recom_color = row[2] | |
recom_text = row[3] | |
recom_cta_color_arr.append(recom_color) | |
recom_cta_text_arr.append(recom_text) | |
target_rate_arr.append(round(target_rate*100, 2)) | |
# print(f" {(color(' ', fore='#ffffff', back=recom_color))} \x1b[1m{recom_text.upper()} {round(target_rate*100, 2)}%\x1b[22m") | |
cta_result = display_CTA_both(target_rate_arr, recom_cta_color_arr,recom_cta_text_arr) | |
st.components.v1.html(cta_result, height=len(target_rate_arr)*30+50) | |
return r2_test | |