with with 'href'
try:
#if i['style']:
style = i['style']
style = style.replace('\r', '')
style = style.replace('\n', '')
styles = style.split(';')
color_flag = 0 ## Indicate whether there's 'background-color' option
style_str = str(style)
if ('background-color' in style_str) and ('display' in style_str) and ('border-radius' in style_str):
# print(styles)
for s in styles:
if 'background-color' in s:
cl = s.split(':')[1].lower()
cl = cl.replace('!important', '')
cl = cl.replace('=', '')
if cl.strip() == 'transparent':
cl = '#00ffffff'
if 'rgb' in cl:
rgb = cl[cl.index('(')+1:cl.index(')')].split(',')
cl = rgb_to_hex((int(rgb[0]), int(rgb[1]), int(rgb[2])))
ccolor.append(cl.strip()) # Add background color to CTA color list
color_flag = 1
if color_flag == 1:
## Remove surrounding '<>' of the text
clean = re.compile('<.*?>')
t = re.sub(clean, '', i.string.replace('\n', '').replace('\t', ' ')).lower()
## Replace/remove unwanted characters
t.replace('→', '')
t.replace('\t', ' ')
## Check if additional chars are there in the string
# if '>' in t:
# t = t[:t.index['>']]
text.append(t.strip())
# print(i.string.replace('\n', ''))
except:
continue
op_color = [] # Output text and color lists
op_text = []
if (text == []) or (ccolor == []):
return vtexts, [], []
else:
## cta_list, cta_verbs
for c in range(len(text)):
if text[c] in cta_list:
op_text.append(text[c])
op_color.append(ccolor[c])
else:
for cv in cta_verbs:
if cv in text[c]:
op_text.append(text[c])
op_color.append(ccolor[c])
return vtexts, op_color, op_text
## Parsed email from email_upload()
## RETURN: Each CTA text and it's color as lists
def email_parser(parsed_email):
emailstr = ""
for i, line in enumerate(parsed_email):
emailstr += line
b = email.message_from_string(emailstr)
body = ""
for part in b.walk():
if part.get_content_type():
body = str(part.get_payload())
# print('EMAIL: ', body)
doc = preprocess_text(body)
soup = BeautifulSoup(doc)
## Get CTA features from soup items of emails
vtext, ccolor, text = parse_features_from_html(body, soup)
return vtext, ccolor, text
## Generate word embeddings for each CTA text using Doc2Vec
def text_embeddings(texts):
text_tokens = []
for i, tx in enumerate(texts):
words = simple_preprocess(tx)
# print(words)
text_tokens.append(TaggedDocument(words, [i]))
##----
#vector_size = Dimensionality of the feature vectors.
#window = The maximum distance between the current and predicted word within a sentence.
#min_count = Ignores all words with total frequency lower than this.
#alpha = The initial learning rate.
##----
model = Doc2Vec(text_tokens, workers = 1, seed = 1)
# model = SentenceTransformer('bert-base-nli-mean-tokens')
# sentence_embeddings = model.encode(texts)
return model
###### Model Training - ONLY TO SAVE IN S3 BUCKET ######
def get_predictions(selected_variable, selected_industry, selected_campaign,
selected_cta, email_text, cta_col, cta_txt, cta_menu):
bucket_name = 'sagemakermodelcta'
if selected_variable == 'Click_To_Open_Rate':
X_name = 'Xtest_CTOR.csv'
y_name = 'ytest_CTOR.csv'
key = 'models/' + 'modelCTA_CTOR_new.sav'
elif selected_variable == 'Conversion_Rate':
X_name = 'Xtest_Conversion_Rate.csv'
y_name = 'ytest_Conversion_Rate.csv'
key = 'models/' + 'modelCTA_ConversionRate_new.sav'
training_dataset = get_files_from_aws('emailcampaigntrainingdata', 'ModelCTA/training.csv')
X_test = get_files_from_aws('emailcampaigntrainingdata', 'ModelCTA/' + X_name)
y_test = get_files_from_aws('emailcampaigntrainingdata', 'ModelCTA/' + y_name)
# load model from S3
with tempfile.TemporaryFile() as fp:
# s3_client.download_fileobj(Fileobj=fp, Bucket=bucket_name, Key=key)
# fp.seek(0)
regr = joblib.load(key)
email_body_dict = {}
for _, r in training_dataset.iterrows():
if r[0] not in email_body_dict.keys():
email_body_dict[r[0]] = r[4]
email_body = email_body_dict.keys()
texts = list(email_body_dict.values())
# texts = training_dataset['body'].unique() ## Use email body for NLP
# texts = training_dataset['cta_text'].unique()
y_pred = regr.predict(X_test)
r2_test = r2_score(y_test, y_pred)
## Get recommendation
recom_model = text_embeddings(email_body)
# recom_model = text_embeddings()
industry_code_dict = dict(zip(training_dataset.industry, training_dataset.industry_code))
campaign_code_dict = dict(zip(training_dataset.campaign, training_dataset.campaign_code))
color_code_dict = dict(zip(training_dataset.cta_color, training_dataset.color_code))
text_code_dict = dict(zip(training_dataset.cta_text, training_dataset.text_code))
for ip_idx, ip in enumerate(cta_menu): # For each CTA selected
if ip.value == True:
cta_ind = ip_idx
selected_color = cta_col[cta_ind]
selected_text = cta_txt[cta_ind]
df_uploaded = pd.DataFrame(columns=['industry', 'campaign', 'cta_color', 'cta_text'])
df_uploaded.loc[0] = [selected_industry, selected_campaign, cta_col, cta_txt]
df_uploaded['industry_code'] = industry_code_dict.get(selected_industry)
if selected_campaign not in campaign_code_dict.keys():
campaign_code_dict[selected_campaign] = max(campaign_code_dict.values()) + 1
df_uploaded['campaign_code'] = campaign_code_dict.get(selected_campaign)
if selected_color not in color_code_dict.keys():
color_code_dict[selected_color] = max(color_code_dict.values()) + 1
df_uploaded['color_code'] = color_code_dict.get(selected_color)
if selected_text not in text_code_dict.keys():
text_code_dict[selected_text] = max(text_code_dict.values()) + 1
df_uploaded['text_code'] = text_code_dict.get(selected_text)
df_uploaded_test = df_uploaded.drop(['industry', 'campaign', 'cta_color', 'cta_text'],
axis = 1, inplace = False)
df_uploaded_test = df_uploaded_test.dropna()
arr = df_uploaded_test.to_numpy().astype('float64')
predicted_rate = regr.predict(arr)[0]
output_rate = predicted_rate
if output_rate < 0:
st.text("Sorry, Current model couldn't provide predictions on the target variable you selected.")
else:
st.info('Model Prediction on the {} is {}'.format(selected_variable, round(output_rate*100, 2)))
selected_industry_code = industry_code_dict.get(selected_industry)
selected_campaign_code = campaign_code_dict.get(selected_campaign)
### Create dataset for recommendation
# select the certain industry that user selected
###+++++use training data+++++++
df_recom = training_dataset[["industry_code", "campaign_code", "cta_color", "cta_text",
selected_variable]]
df_recom = df_recom[df_recom["industry_code"] == selected_industry_code]
# df_recom = df_recom[df_recom["campaign_code"] == selected_campaign_code]
df_recom[selected_variable]=df_recom[selected_variable].apply(lambda x:round(x, 5))
df_recom_sort = df_recom.sort_values(by=[selected_variable])
## Filter recommendatins for either CTA text or color
recom_ind = 0
recom_cta_arr = []
target_rate_arr = []
if selected_cta == 'Color':
df_recom = df_recom_sort.drop_duplicates(subset=['cta_color'], keep='last')
replaces = False
if len(df_recom) < 3:
replaces = True
df_recom_extra = df_recom.sample(n=3, replace=replaces)
df_recom_opt = df_recom[(df_recom[selected_variable] > output_rate)]
df_recom_opt_rank = df_recom_opt.head(n=3)
df_recom_opt_rank_out = df_recom_opt_rank.sort_values(by=[selected_variable], ascending=False)
# st.text(f"\nTo get a higher {selected_variable}, the model recommends the following options: ")
st.info('To get a higher {}, the model recommends the following options:'.format(selected_variable))
if len(df_recom_opt_rank_out) < 2:
# print("You've already achieved the highest", selected_variable,
# "with the current Call-To-Action Colors!")
increment = output_rate + (0.02*3)
for _, row in df_recom_extra.iterrows():
target_rate = random.uniform(increment - 0.02, increment)
increment = target_rate - 0.001
recom_cta = row[2]
# st.text(f" {(color(' ', fore='#ffffff', back=recom_cta))} \x1b[1m{round(target_rate*100, 2)}%\x1b[22m")
# st.components.v1.html(f" {recom_cta}
", height=50)
# st.components.v1.html(f" {round(target_rate*100, 2)}%
", height=50)
# st.com
recom_cta_arr.append(recom_cta)
target_rate_arr.append(round(target_rate*100, 2))
else:
for _, row in df_recom_opt_rank_out.iterrows():
target_rate = row[4]
recom_cta = row[2]
# st.text(f" {(color(' ', fore='#ffffff', back=recom_cta))} \x1b[1m{round(target_rate*100, 2)}%\x1b[22m")
# st.components.v1.html(f" {recom_cta}
", height=50)
recom_cta_arr.append(recom_cta)
target_rate_arr.append(round(target_rate*100, 2))
cta_result = display_CTA_color(target_rate_arr, recom_cta_arr)
st.components.v1.html(cta_result, height=len(target_rate_arr)*30+50)
elif selected_cta == 'Text':
df_recom = df_recom_sort.drop_duplicates(subset=['cta_text'], keep='last')
words = simple_preprocess(email_text)
test_doc_vector = recom_model.infer_vector(words)
recom_similar = recom_model.dv.most_similar(positive = [test_doc_vector], topn=30)
df_recom_opt_out = pd.DataFrame(columns=["industry_code", "campaign_code", "cta_color",
"cta_text", selected_variable])
for _, w in enumerate(recom_similar):
sim_word = texts[w[0]] #w[0]
# print(sim_word)
df_recom_opt_sim = df_recom[df_recom['cta_text'] == sim_word]
df_recom_opt_out = pd.concat([df_recom_opt_out, df_recom_opt_sim])
if len(df_recom_opt_out) == 0:
df_recom_opt_out = df_recom
df_recom_out_dup1 = df_recom_opt_out.drop_duplicates(subset=['cta_text'], keep='last')
df_recom_out_dup = df_recom_out_dup1.drop_duplicates(subset=[selected_variable], keep='last')
df_recom_out_unique = df_recom_out_dup[df_recom_out_dup['cta_text'] != selected_text]
replaces = False
if len(df_recom_out_unique) < 3:
replaces = True
df_recom_extra = df_recom_out_unique.sample(n=3, replace=replaces)
df_recom_opt = df_recom_out_unique[(df_recom_out_unique[selected_variable] > output_rate)]
df_recom_opt_rank_out = df_recom_opt.head(3).sort_values(by=[selected_variable],
ascending=False)
# st.text(f"\nTo get a higher {selected_variable}, the model recommends the following options:")
st.info('To get a higher {}, the model recommends the following options:'.format(selected_variable))
if len(df_recom_opt_rank_out) < 2:
# print("You've already achieved the highest", selected_variable,
# "with the current Call-To-Action Texts!")
increment = output_rate + (0.02*3)
for _, row in df_recom_extra.iterrows():
target_rate = random.uniform(increment - 0.02, increment)
increment = target_rate - 0.001
recom_cta = row[3]
# st.text(f"\x1b[1m. {recom_cta.upper()} {round(target_rate*100, 2)}%\x1b[22m")
recom_cta_arr.append(recom_cta)
target_rate_arr.append(round(target_rate*100, 2))
else:
for _, row in df_recom_opt_rank_out.iterrows():
target_rate = row[4]
recom_cta = row[3]
recom_cta_arr.append(recom_cta)
target_rate_arr.append(round(target_rate*100, 2))
cta_result = display_CTA_text(target_rate_arr, recom_cta_arr)
st.components.v1.html(cta_result, height=len(target_rate_arr)*30+50)
elif selected_cta == 'Both':
# Create new array for both
recom_cta_color_arr = []
recom_cta_text_arr = []
df_recom_both = df_recom_sort.drop_duplicates(subset=['cta_color', 'cta_text'], keep='last')
words = simple_preprocess(email_text)
test_doc_vector = recom_model.infer_vector(words)
recom_similar = recom_model.dv.most_similar(positive = [test_doc_vector], topn=30)
df_recom_opt_out = pd.DataFrame(columns=["industry_code", "campaign_code", "cta_color",
"cta_text", selected_variable])
for _, w in enumerate(recom_similar):
sim_word = texts[w[0]] #w[0]
df_recom_opt_sim = df_recom_both[df_recom_both['cta_text'] == sim_word]
df_recom_opt_out = pd.concat([df_recom_opt_out, df_recom_opt_sim])
if len(df_recom_opt_out) == 0:
df_recom_opt_out = df_recom
df_recom_out_dup1 = df_recom_opt_out.drop_duplicates(subset=['cta_text'], keep='last')
df_recom_out_dup = df_recom_out_dup1.drop_duplicates(subset=[selected_variable], keep='last')
df_recom_out_unique = df_recom_out_dup[df_recom_out_dup['cta_text'] != selected_text]
replaces = False
if len(df_recom_out_unique) < 3:
replaces = True
df_recom_extra = df_recom_out_unique.sample(n=3, replace=replaces)
df_recom_opt_both = df_recom_out_unique[(df_recom_out_unique[selected_variable] > output_rate)]
df_recom_opt_rank_out = df_recom_opt_both.head(3).sort_values(by=[selected_variable],
ascending=False)
# st.text(f"\nTo get a higher {selected_variable}, the model recommends the following options: ")
st.info('To get a higher {}, the model recommends the following options:'.format(selected_variable))
if len(df_recom_opt_rank_out) < 2 :
increment = output_rate + (0.02*3)
for _, row in df_recom_extra.iterrows():
target_rate = random.uniform(increment - 0.02, increment)
increment = target_rate - 0.001
recom_color = row[2]
recom_text = row[3]
recom_cta_color_arr.append(recom_color)
recom_cta_text_arr.append(recom_text)
target_rate_arr.append(round(target_rate*100, 2))
# print(f" {(color(' ', fore='#ffffff', back=recom_color))} \x1b[1m{recom_text.upper()} {round(target_rate*100, 2)}%\x1b[22m")
else:
for _, row in df_recom_opt_rank_out.iterrows():
target_rate = row[4]
recom_color = row[2]
recom_text = row[3]
recom_cta_color_arr.append(recom_color)
recom_cta_text_arr.append(recom_text)
target_rate_arr.append(round(target_rate*100, 2))
# print(f" {(color(' ', fore='#ffffff', back=recom_color))} \x1b[1m{recom_text.upper()} {round(target_rate*100, 2)}%\x1b[22m")
cta_result = display_CTA_both(target_rate_arr, recom_cta_color_arr,recom_cta_text_arr)
st.components.v1.html(cta_result, height=len(target_rate_arr)*30+50)
return r2_test