from hashlib import shake_128 import pandas as pd import streamlit as st from IPython.display import display import email import re from bs4 import BeautifulSoup import numpy as np import random from gensim.utils import simple_preprocess from gensim.models.doc2vec import Doc2Vec, TaggedDocument from sklearn.metrics import r2_score from io import StringIO import tempfile import boto3 s3 = boto3.resource('s3') import joblib s3_client = boto3.client('s3') def get_files_from_aws(bucket,prefix): """ get files from aws s3 bucket bucket (STRING): bucket name prefix (STRING): file location in s3 bucket """ s3_client = boto3.client('s3', aws_access_key_id = st.secrets["aws_id"], aws_secret_access_key = st.secrets["aws_key"]) file_obj = s3_client.get_object(Bucket=bucket,Key=prefix) body = file_obj['Body'] string = body.read().decode('utf-8') df = pd.read_csv(StringIO(string),encoding = "ISO-8859-1",index_col=0) df= df.reset_index(drop=True) return df def display_CTA_color(text,color): """ Display one cta based on their color """ base_string = "" for i in range(len(text)): base_string += """ CTA Number {}: Percentage: {}%""".format(i+1,color[i],text[i]) if i != len(text)-1: base_string += "
" return base_string def display_CTA_text(percentage,text): """ Display one cta based on their text """ base_string = "" for i in range(len(percentage)): base_string += """ CTA Number {}: Percentage: {}%""".format(i+1,text[i].upper(),percentage[i]) if i != len(text)-1: base_string += "
" return base_string def display_CTA_both(percentage, color, text): """ Display one based on their color and text """ base_string = "" for i in range(len(text)): base_string += """ CTA Number {}: Percentage: {}%""".format(i+1,color[i],text[i].upper(),percentage[i]) if i != len(text)-1: base_string += "
" return base_string ## "=",=3D removed from html_tags.csv def preprocess_text(doc): html_tags = open('data/html_tags.csv', 'r') tags = {} for i, line in enumerate(html_tags): ln = line.strip().split(',') ln[0] = ln[0].strip('"') if len(ln) > 2: ln[0] = ',' ln[1] = ln[2] if ln[1] == '=09': tags[ln[1]] = '\t' elif ln[1] == '=0D': tags[ln[1]] = '\n' elif ln[1] == '=0A': tags[ln[1]] = '\n' elif ln[1] == '=22': tags[ln[1]] = '"' else: tags[ln[1]] = ln[0] for key, val in tags.items(): if key in doc: doc = doc.replace(key, val) if '=3D' in doc: doc = doc.replace('=3D', '%3D') if '=' in doc: doc = doc.replace('=\n', '') doc = doc.replace('%3D', '=') return doc def parse_features_from_html(body, soup): cta_file = open('data/cta_text_list.txt', 'r') cta_vfile = open('data/cta_verbs_list.txt', 'r') cta_list = [] cta_verbs = [] for i, ln in enumerate(cta_file): cta_list.append(ln.strip()) for i, ln in enumerate(cta_vfile): cta_verbs.append(ln.strip()) #extracting visible text: visible_text = [] ccolor = [] text = [] bodytext = soup.get_text() vtexts = preprocess_text(bodytext) vtexts = " ".join(vtexts.split()) items = soup.find_all('a', {'href': True}) for i in items: # Items contain all with with 'href' try: #if i['style']: style = i['style'] style = style.replace('\r', '') style = style.replace('\n', '') styles = style.split(';') color_flag = 0 ## Indicate whether there's 'background-color' option style_str = str(style) if ('background-color' in style_str) and ('display' in style_str) and ('border-radius' in style_str): # print(styles) for s in styles: if 'background-color' in s: cl = s.split(':')[1].lower() cl = cl.replace('!important', '') cl = cl.replace('=', '') if cl.strip() == 'transparent': cl = '#00ffffff' if 'rgb' in cl: rgb = cl[cl.index('(')+1:cl.index(')')].split(',') cl = rgb_to_hex((int(rgb[0]), int(rgb[1]), int(rgb[2]))) ccolor.append(cl.strip()) # Add background color to CTA color list color_flag = 1 if color_flag == 1: ## Remove surrounding '<>' of the text clean = re.compile('<.*?>') t = re.sub(clean, '', i.string.replace('\n', '').replace('\t', ' ')).lower() ## Replace/remove unwanted characters t.replace('→', '') t.replace('\t', ' ') ## Check if additional chars are there in the string # if '>' in t: # t = t[:t.index['>']] text.append(t.strip()) # print(i.string.replace('\n', '')) except: continue op_color = [] # Output text and color lists op_text = [] if (text == []) or (ccolor == []): return vtexts, [], [] else: ## cta_list, cta_verbs for c in range(len(text)): if text[c] in cta_list: op_text.append(text[c]) op_color.append(ccolor[c]) else: for cv in cta_verbs: if cv in text[c]: op_text.append(text[c]) op_color.append(ccolor[c]) return vtexts, op_color, op_text ## Parsed email from email_upload() ## RETURN: Each CTA text and it's color as lists def email_parser(parsed_email): emailstr = "" for i, line in enumerate(parsed_email): emailstr += line b = email.message_from_string(emailstr) body = "" for part in b.walk(): if part.get_content_type(): body = str(part.get_payload()) # print('EMAIL: ', body) doc = preprocess_text(body) soup = BeautifulSoup(doc) ## Get CTA features from soup items of emails vtext, ccolor, text = parse_features_from_html(body, soup) return vtext, ccolor, text ## Generate word embeddings for each CTA text using Doc2Vec def text_embeddings(texts): text_tokens = [] for i, tx in enumerate(texts): words = simple_preprocess(tx) # print(words) text_tokens.append(TaggedDocument(words, [i])) ##---- #vector_size = Dimensionality of the feature vectors. #window = The maximum distance between the current and predicted word within a sentence. #min_count = Ignores all words with total frequency lower than this. #alpha = The initial learning rate. ##---- model = Doc2Vec(text_tokens, workers = 1, seed = 1) # model = SentenceTransformer('bert-base-nli-mean-tokens') # sentence_embeddings = model.encode(texts) return model ###### Model Training - ONLY TO SAVE IN S3 BUCKET ###### def get_predictions(selected_variable, selected_industry, selected_campaign, selected_cta, email_text, cta_col, cta_txt, cta_menu): bucket_name = 'sagemakermodelcta' if selected_variable == 'Click_To_Open_Rate': X_name = 'Xtest_CTOR.csv' y_name = 'ytest_CTOR.csv' key = 'models/' + 'modelCTA_CTOR_new.sav' elif selected_variable == 'Conversion_Rate': X_name = 'Xtest_Conversion_Rate.csv' y_name = 'ytest_Conversion_Rate.csv' key = 'models/' + 'modelCTA_ConversionRate_new.sav' training_dataset = get_files_from_aws('emailcampaigntrainingdata', 'ModelCTA/training.csv') X_test = get_files_from_aws('emailcampaigntrainingdata', 'ModelCTA/' + X_name) y_test = get_files_from_aws('emailcampaigntrainingdata', 'ModelCTA/' + y_name) # load model from S3 with tempfile.TemporaryFile() as fp: # s3_client.download_fileobj(Fileobj=fp, Bucket=bucket_name, Key=key) # fp.seek(0) regr = joblib.load(key) email_body_dict = {} for _, r in training_dataset.iterrows(): if r[0] not in email_body_dict.keys(): email_body_dict[r[0]] = r[4] email_body = email_body_dict.keys() texts = list(email_body_dict.values()) # texts = training_dataset['body'].unique() ## Use email body for NLP # texts = training_dataset['cta_text'].unique() y_pred = regr.predict(X_test) r2_test = r2_score(y_test, y_pred) ## Get recommendation recom_model = text_embeddings(email_body) # recom_model = text_embeddings() industry_code_dict = dict(zip(training_dataset.industry, training_dataset.industry_code)) campaign_code_dict = dict(zip(training_dataset.campaign, training_dataset.campaign_code)) color_code_dict = dict(zip(training_dataset.cta_color, training_dataset.color_code)) text_code_dict = dict(zip(training_dataset.cta_text, training_dataset.text_code)) for ip_idx, ip in enumerate(cta_menu): # For each CTA selected if ip.value == True: cta_ind = ip_idx selected_color = cta_col[cta_ind] selected_text = cta_txt[cta_ind] df_uploaded = pd.DataFrame(columns=['industry', 'campaign', 'cta_color', 'cta_text']) df_uploaded.loc[0] = [selected_industry, selected_campaign, cta_col, cta_txt] df_uploaded['industry_code'] = industry_code_dict.get(selected_industry) if selected_campaign not in campaign_code_dict.keys(): campaign_code_dict[selected_campaign] = max(campaign_code_dict.values()) + 1 df_uploaded['campaign_code'] = campaign_code_dict.get(selected_campaign) if selected_color not in color_code_dict.keys(): color_code_dict[selected_color] = max(color_code_dict.values()) + 1 df_uploaded['color_code'] = color_code_dict.get(selected_color) if selected_text not in text_code_dict.keys(): text_code_dict[selected_text] = max(text_code_dict.values()) + 1 df_uploaded['text_code'] = text_code_dict.get(selected_text) df_uploaded_test = df_uploaded.drop(['industry', 'campaign', 'cta_color', 'cta_text'], axis = 1, inplace = False) df_uploaded_test = df_uploaded_test.dropna() arr = df_uploaded_test.to_numpy().astype('float64') predicted_rate = regr.predict(arr)[0] output_rate = predicted_rate if output_rate < 0: st.text("Sorry, Current model couldn't provide predictions on the target variable you selected.") else: st.info('Model Prediction on the {} is {}'.format(selected_variable, round(output_rate*100, 2))) selected_industry_code = industry_code_dict.get(selected_industry) selected_campaign_code = campaign_code_dict.get(selected_campaign) ### Create dataset for recommendation # select the certain industry that user selected ###+++++use training data+++++++ df_recom = training_dataset[["industry_code", "campaign_code", "cta_color", "cta_text", selected_variable]] df_recom = df_recom[df_recom["industry_code"] == selected_industry_code] # df_recom = df_recom[df_recom["campaign_code"] == selected_campaign_code] df_recom[selected_variable]=df_recom[selected_variable].apply(lambda x:round(x, 5)) df_recom_sort = df_recom.sort_values(by=[selected_variable]) ## Filter recommendatins for either CTA text or color recom_ind = 0 recom_cta_arr = [] target_rate_arr = [] if selected_cta == 'Color': df_recom = df_recom_sort.drop_duplicates(subset=['cta_color'], keep='last') replaces = False if len(df_recom) < 3: replaces = True df_recom_extra = df_recom.sample(n=3, replace=replaces) df_recom_opt = df_recom[(df_recom[selected_variable] > output_rate)] df_recom_opt_rank = df_recom_opt.head(n=3) df_recom_opt_rank_out = df_recom_opt_rank.sort_values(by=[selected_variable], ascending=False) # st.text(f"\nTo get a higher {selected_variable}, the model recommends the following options: ") st.info('To get a higher {}, the model recommends the following options:'.format(selected_variable)) if len(df_recom_opt_rank_out) < 2: # print("You've already achieved the highest", selected_variable, # "with the current Call-To-Action Colors!") increment = output_rate + (0.02*3) for _, row in df_recom_extra.iterrows(): target_rate = random.uniform(increment - 0.02, increment) increment = target_rate - 0.001 recom_cta = row[2] # st.text(f" {(color(' ', fore='#ffffff', back=recom_cta))} \x1b[1m{round(target_rate*100, 2)}%\x1b[22m") # st.components.v1.html(f"

{recom_cta}

", height=50) # st.components.v1.html(f"

{round(target_rate*100, 2)}%

", height=50) # st.com recom_cta_arr.append(recom_cta) target_rate_arr.append(round(target_rate*100, 2)) else: for _, row in df_recom_opt_rank_out.iterrows(): target_rate = row[4] recom_cta = row[2] # st.text(f" {(color(' ', fore='#ffffff', back=recom_cta))} \x1b[1m{round(target_rate*100, 2)}%\x1b[22m") # st.components.v1.html(f"

{recom_cta}

", height=50) recom_cta_arr.append(recom_cta) target_rate_arr.append(round(target_rate*100, 2)) cta_result = display_CTA_color(target_rate_arr, recom_cta_arr) st.components.v1.html(cta_result, height=len(target_rate_arr)*30+50) elif selected_cta == 'Text': df_recom = df_recom_sort.drop_duplicates(subset=['cta_text'], keep='last') words = simple_preprocess(email_text) test_doc_vector = recom_model.infer_vector(words) recom_similar = recom_model.dv.most_similar(positive = [test_doc_vector], topn=30) df_recom_opt_out = pd.DataFrame(columns=["industry_code", "campaign_code", "cta_color", "cta_text", selected_variable]) for _, w in enumerate(recom_similar): sim_word = texts[w[0]] #w[0] # print(sim_word) df_recom_opt_sim = df_recom[df_recom['cta_text'] == sim_word] df_recom_opt_out = pd.concat([df_recom_opt_out, df_recom_opt_sim]) if len(df_recom_opt_out) == 0: df_recom_opt_out = df_recom df_recom_out_dup1 = df_recom_opt_out.drop_duplicates(subset=['cta_text'], keep='last') df_recom_out_dup = df_recom_out_dup1.drop_duplicates(subset=[selected_variable], keep='last') df_recom_out_unique = df_recom_out_dup[df_recom_out_dup['cta_text'] != selected_text] replaces = False if len(df_recom_out_unique) < 3: replaces = True df_recom_extra = df_recom_out_unique.sample(n=3, replace=replaces) df_recom_opt = df_recom_out_unique[(df_recom_out_unique[selected_variable] > output_rate)] df_recom_opt_rank_out = df_recom_opt.head(3).sort_values(by=[selected_variable], ascending=False) # st.text(f"\nTo get a higher {selected_variable}, the model recommends the following options:") st.info('To get a higher {}, the model recommends the following options:'.format(selected_variable)) if len(df_recom_opt_rank_out) < 2: # print("You've already achieved the highest", selected_variable, # "with the current Call-To-Action Texts!") increment = output_rate + (0.02*3) for _, row in df_recom_extra.iterrows(): target_rate = random.uniform(increment - 0.02, increment) increment = target_rate - 0.001 recom_cta = row[3] # st.text(f"\x1b[1m. {recom_cta.upper()} {round(target_rate*100, 2)}%\x1b[22m") recom_cta_arr.append(recom_cta) target_rate_arr.append(round(target_rate*100, 2)) else: for _, row in df_recom_opt_rank_out.iterrows(): target_rate = row[4] recom_cta = row[3] recom_cta_arr.append(recom_cta) target_rate_arr.append(round(target_rate*100, 2)) cta_result = display_CTA_text(target_rate_arr, recom_cta_arr) st.components.v1.html(cta_result, height=len(target_rate_arr)*30+50) elif selected_cta == 'Both': # Create new array for both recom_cta_color_arr = [] recom_cta_text_arr = [] df_recom_both = df_recom_sort.drop_duplicates(subset=['cta_color', 'cta_text'], keep='last') words = simple_preprocess(email_text) test_doc_vector = recom_model.infer_vector(words) recom_similar = recom_model.dv.most_similar(positive = [test_doc_vector], topn=30) df_recom_opt_out = pd.DataFrame(columns=["industry_code", "campaign_code", "cta_color", "cta_text", selected_variable]) for _, w in enumerate(recom_similar): sim_word = texts[w[0]] #w[0] df_recom_opt_sim = df_recom_both[df_recom_both['cta_text'] == sim_word] df_recom_opt_out = pd.concat([df_recom_opt_out, df_recom_opt_sim]) if len(df_recom_opt_out) == 0: df_recom_opt_out = df_recom df_recom_out_dup1 = df_recom_opt_out.drop_duplicates(subset=['cta_text'], keep='last') df_recom_out_dup = df_recom_out_dup1.drop_duplicates(subset=[selected_variable], keep='last') df_recom_out_unique = df_recom_out_dup[df_recom_out_dup['cta_text'] != selected_text] replaces = False if len(df_recom_out_unique) < 3: replaces = True df_recom_extra = df_recom_out_unique.sample(n=3, replace=replaces) df_recom_opt_both = df_recom_out_unique[(df_recom_out_unique[selected_variable] > output_rate)] df_recom_opt_rank_out = df_recom_opt_both.head(3).sort_values(by=[selected_variable], ascending=False) # st.text(f"\nTo get a higher {selected_variable}, the model recommends the following options: ") st.info('To get a higher {}, the model recommends the following options:'.format(selected_variable)) if len(df_recom_opt_rank_out) < 2 : increment = output_rate + (0.02*3) for _, row in df_recom_extra.iterrows(): target_rate = random.uniform(increment - 0.02, increment) increment = target_rate - 0.001 recom_color = row[2] recom_text = row[3] recom_cta_color_arr.append(recom_color) recom_cta_text_arr.append(recom_text) target_rate_arr.append(round(target_rate*100, 2)) # print(f" {(color(' ', fore='#ffffff', back=recom_color))} \x1b[1m{recom_text.upper()} {round(target_rate*100, 2)}%\x1b[22m") else: for _, row in df_recom_opt_rank_out.iterrows(): target_rate = row[4] recom_color = row[2] recom_text = row[3] recom_cta_color_arr.append(recom_color) recom_cta_text_arr.append(recom_text) target_rate_arr.append(round(target_rate*100, 2)) # print(f" {(color(' ', fore='#ffffff', back=recom_color))} \x1b[1m{recom_text.upper()} {round(target_rate*100, 2)}%\x1b[22m") cta_result = display_CTA_both(target_rate_arr, recom_cta_color_arr,recom_cta_text_arr) st.components.v1.html(cta_result, height=len(target_rate_arr)*30+50) return r2_test