Mastercard / 8_Scenario_Planner.py
BlendMMM's picture
Upload 81 files
94bbd2b verified
raw
history blame
No virus
21 kB
import streamlit as st
from numerize.numerize import numerize
import numpy as np
from functools import partial
from collections import OrderedDict
from plotly.subplots import make_subplots
import plotly.graph_objects as go
from utilities import format_numbers,load_local_css,set_header,initialize_data,load_authenticator,send_email,channel_name_formating
from classes import class_from_dict,class_to_dict
import pickle
import streamlit_authenticator as stauth
import yaml
from yaml import SafeLoader
import re
import pandas as pd
import plotly.express as px
target='Revenue'
st.set_page_config(layout='wide')
load_local_css('styles.css')
set_header()
for k, v in st.session_state.items():
if k not in ['logout', 'login','config'] and not k.startswith('FormSubmitter'):
st.session_state[k] = v
# ======================================================== #
# ======================= Functions ====================== #
# ======================================================== #
def optimize():
"""
Optimize the spends for the sales
"""
channel_list = [key for key,value in st.session_state['optimization_channels'].items() if value]
print('channel_list')
print(channel_list)
print('@@@@@@@@')
if len(channel_list) > 0 :
scenario = st.session_state['scenario']
result = st.session_state['scenario'].optimize(st.session_state['total_spends_change'],channel_list)
for channel_name, modified_spends in result:
st.session_state[channel_name] = numerize(modified_spends * scenario.channels[channel_name].conversion_rate,1)
prev_spends = st.session_state['scenario'].channels[channel_name].actual_total_spends
st.session_state[f'{channel_name}_change'] = round(100*(modified_spends - prev_spends) / prev_spends,2)
def save_scenario(scenario_name):
"""
Save the current scenario with the mentioned name in the session state
Parameters
----------
scenario_name
Name of the scenario to be saved
"""
if 'saved_scenarios' not in st.session_state:
st.session_state = OrderedDict()
#st.session_state['saved_scenarios'][scenario_name] = st.session_state['scenario'].save()
st.session_state['saved_scenarios'][scenario_name] = class_to_dict(st.session_state['scenario'])
st.session_state['scenario_input'] = ""
print(type(st.session_state['saved_scenarios']))
with open('../saved_scenarios.pkl', 'wb') as f:
pickle.dump(st.session_state['saved_scenarios'],f)
def update_all_spends():
"""
Updates spends for all the channels with the given overall spends change
"""
percent_change = st.session_state['total_spends_change']
for channel_name in st.session_state['channels_list']:
channel = st.session_state['scenario'].channels[channel_name]
current_spends = channel.actual_total_spends
modified_spends = (1 + percent_change/100) * current_spends
st.session_state['scenario'].update(channel_name, modified_spends)
st.session_state[channel_name] = numerize(modified_spends*channel.conversion_rate,1)
st.session_state[f'{channel_name}_change'] = percent_change
def extract_number_for_string(string_input):
string_input = string_input.upper()
if string_input.endswith('K'):
return float(string_input[:-1])*10**3
elif string_input.endswith('M'):
return float(string_input[:-1])*10**6
elif string_input.endswith('B'):
return float(string_input[:-1])*10**9
def validate_input(string_input):
pattern = r'\d+\.?\d*[K|M|B]$'
match = re.match(pattern, string_input)
if match is None:
return False
return True
def update_data_by_percent(channel_name):
prev_spends = st.session_state['scenario'].channels[channel_name].actual_total_spends * st.session_state['scenario'].channels[channel_name].conversion_rate
modified_spends = prev_spends * (1 + st.session_state[f'{channel_name}_change']/100)
st.session_state[channel_name] = numerize(modified_spends,1)
st.session_state['scenario'].update(channel_name, modified_spends/st.session_state['scenario'].channels[channel_name].conversion_rate)
def update_data(channel_name):
"""
Updates the spends for the given channel
"""
if validate_input(st.session_state[channel_name]):
modified_spends = extract_number_for_string(st.session_state[channel_name])
prev_spends = st.session_state['scenario'].channels[channel_name].actual_total_spends * st.session_state['scenario'].channels[channel_name].conversion_rate
st.session_state[f'{channel_name}_change'] = round(100*(modified_spends - prev_spends) / prev_spends,2)
st.session_state['scenario'].update(channel_name, modified_spends/st.session_state['scenario'].channels[channel_name].conversion_rate)
# st.session_state['scenario'].update(channel_name, modified_spends)
# else:
# try:
# modified_spends = float(st.session_state[channel_name])
# prev_spends = st.session_state['scenario'].channels[channel_name].actual_total_spends * st.session_state['scenario'].channels[channel_name].conversion_rate
# st.session_state[f'{channel_name}_change'] = round(100*(modified_spends - prev_spends) / prev_spends,2)
# st.session_state['scenario'].update(channel_name, modified_spends/st.session_state['scenario'].channels[channel_name].conversion_rate)
# st.session_state[f'{channel_name}'] = numerize(modified_spends,1)
# except ValueError:
# st.write('Invalid input')
def select_channel_for_optimization(channel_name):
"""
Marks the given channel for optimization
"""
st.session_state['optimization_channels'][channel_name] = st.session_state[f'{channel_name}_selected']
def select_all_channels_for_optimization():
"""
Marks all the channel for optimization
"""
for channel_name in st.session_state['optimization_channels'].keys():
st.session_state[f'{channel_name}_selected' ] = st.session_state['optimze_all_channels']
st.session_state['optimization_channels'][channel_name] = st.session_state['optimze_all_channels']
def update_penalty():
"""
Updates the penalty flag for sales calculation
"""
st.session_state['scenario'].update_penalty(st.session_state['apply_penalty'])
def reset_scenario():
# print(st.session_state['default_scenario_dict'])
# st.session_state['scenario'] = class_from_dict(st.session_state['default_scenario_dict'])
# for channel in st.session_state['scenario'].channels.values():
# st.session_state[channel.name] = float(channel.actual_total_spends * channel.conversion_rate)
initialize_data()
for channel_name in st.session_state['channels_list']:
st.session_state[f'{channel_name}_selected'] = False
st.session_state[f'{channel_name}_change'] = 0
st.session_state['optimze_all_channels'] = False
def format_number(num):
if num >= 1_000_000:
return f"{num / 1_000_000:.2f}M"
elif num >= 1_000:
return f"{num / 1_000:.0f}K"
else:
return f"{num:.2f}"
def summary_plot(data, x, y, title, text_column):
fig = px.bar(data, x=x, y=y, orientation='h',
title=title, text=text_column, color='Channel_name')
# Convert text_column to numeric values
data[text_column] = pd.to_numeric(data[text_column], errors='coerce')
# Update the format of the displayed text based on magnitude
fig.update_traces(texttemplate='%{text:.2s}', textposition='outside', hovertemplate='%{x:.2s}')
fig.update_layout(xaxis_title=x, yaxis_title='Channel Name', showlegend=False)
return fig
def s_curve(x,K,b,a,x0):
return K / (1 + b*np.exp(-a*(x-x0)))
@st.cache
def plot_response_curves():
cols=4
rcs = st.session_state['rcs']
shapes = []
fig = make_subplots(rows=6, cols=cols,subplot_titles=channels_list)
for i in range(0, len(channels_list)):
col = channels_list[i]
x = st.session_state['actual_df'][col].values
spends = x.sum()
power = (np.ceil(np.log(x.max()) / np.log(10) )- 3)
x = np.linspace(0,3*x.max(),200)
K = rcs[col]['K']
b = rcs[col]['b']
a = rcs[col]['a']
x0 = rcs[col]['x0']
y = s_curve(x/10**power,K,b,a,x0)
roi = y/x
marginal_roi = a * (y)*(1-y/K)
fig.add_trace(
go.Scatter(x=52*x*st.session_state['scenario'].channels[col].conversion_rate,
y=52*y,
name=col,
customdata = np.stack((roi, marginal_roi),axis=-1),
hovertemplate="Spend:%{x:$.2s}<br>Sale:%{y:$.2s}<br>ROI:%{customdata[0]:.3f}<br>MROI:%{customdata[1]:.3f}"),
row=1+(i)//cols , col=i%cols + 1
)
fig.add_trace(go.Scatter(x=[spends*st.session_state['scenario'].channels[col].conversion_rate],
y=[52*s_curve(spends/(10**power*52),K,b,a,x0)],
name=col,
legendgroup=col,
showlegend=False,
marker=dict(color=['black'])),
row=1+(i)//cols , col=i%cols + 1)
shapes.append(go.layout.Shape(type="line",
x0=0,
y0=52*s_curve(spends/(10**power*52),K,b,a,x0),
x1=spends*st.session_state['scenario'].channels[col].conversion_rate,
y1=52*s_curve(spends/(10**power*52),K,b,a,x0),
line_width=1,
line_dash="dash",
line_color="black",
xref= f'x{i+1}',
yref= f'y{i+1}'))
shapes.append(go.layout.Shape(type="line",
x0=spends*st.session_state['scenario'].channels[col].conversion_rate,
y0=0,
x1=spends*st.session_state['scenario'].channels[col].conversion_rate,
y1=52*s_curve(spends/(10**power*52),K,b,a,x0),
line_width=1,
line_dash="dash",
line_color="black",
xref= f'x{i+1}',
yref= f'y{i+1}'))
fig.update_layout(height=1500, width=1000, title_text="Response Curves",showlegend=False,shapes=shapes)
fig.update_annotations(font_size=10)
fig.update_xaxes(title='Spends')
fig.update_yaxes(title=target)
return fig
# ======================================================== #
# ==================== HTML Components =================== #
# ======================================================== #
def generate_spending_header(heading):
return st.markdown(f"""<h2 class="spends-header">{heading}</h2>""",unsafe_allow_html=True)
# ======================================================== #
# =================== Session variables ================== #
# ======================================================== #
with open('config.yaml') as file:
config = yaml.load(file, Loader=SafeLoader)
st.session_state['config'] = config
authenticator = stauth.Authenticate(
config['credentials'],
config['cookie']['name'],
config['cookie']['key'],
config['cookie']['expiry_days'],
config['preauthorized']
)
st.session_state['authenticator'] = authenticator
name, authentication_status, username = authenticator.login('Login', 'main')
auth_status = st.session_state.get('authentication_status')
if auth_status == True:
authenticator.logout('Logout', 'main')
is_state_initiaized = st.session_state.get('initialized',False)
if not is_state_initiaized:
initialize_data()
channels_list = st.session_state['channels_list']
# ======================================================== #
# ========================== UI ========================== #
# ======================================================== #
print(list(st.session_state.keys()))
st.header('Simulation')
main_header = st.columns((2,2))
sub_header = st.columns((1,1,1,1))
_scenario = st.session_state['scenario']
with main_header[0]:
st.subheader('Actual')
with main_header[-1]:
st.subheader('Simulated')
with sub_header[0]:
st.metric(label = 'Spends', value=format_numbers(_scenario.actual_total_spends))
with sub_header[1]:
st.metric(label = target, value=format_numbers(float(_scenario.actual_total_sales),include_indicator=False))
with sub_header[2]:
st.metric(label = 'Spends',
value=format_numbers(_scenario.modified_total_spends),
delta=numerize(_scenario.delta_spends,1))
with sub_header[3]:
st.metric(label = target,
value=format_numbers(float(_scenario.modified_total_sales),include_indicator=False),
delta=numerize(_scenario.delta_sales,1))
with st.expander("Channel Spends Simulator"):
_columns = st.columns((2,4,1,1))
with _columns[0]:
st.checkbox(label='Optimize all Channels',
key=f'optimze_all_channels',
value=False,
on_change=select_all_channels_for_optimization,
)
st.number_input('Percent change of total spends',
key=f'total_spends_change',
step= 1,
on_change=update_all_spends)
with _columns[2]:
st.button('Optimize',on_click=optimize)
with _columns[3]:
st.button('Reset',on_click=reset_scenario)
st.markdown("""<hr class="spends-heading-seperator">""", unsafe_allow_html=True)
_columns = st.columns((2.5,2,1.5,1.5,1))
with _columns[0]:
generate_spending_header('Channel')
with _columns[1]:
generate_spending_header('Spends Input')
with _columns[2]:
generate_spending_header('Spends')
with _columns[3]:
generate_spending_header(target)
with _columns[4]:
generate_spending_header('Optimize')
st.markdown("""<hr class="spends-heading-seperator">""", unsafe_allow_html=True)
if 'acutual_predicted' not in st.session_state:
st.session_state['acutual_predicted']={'Channel_name':[],
'Actual_spend':[],
'Optimized_spend':[],
'Delta':[]
}
for i,channel_name in enumerate(channels_list):
_channel_class = st.session_state['scenario'].channels[channel_name]
_columns = st.columns((2.5,1.5,1.5,1.5,1))
with _columns[0]:
st.write(channel_name_formating(channel_name))
with _columns[1]:
channel_bounds = _channel_class.bounds
channel_spends = float(_channel_class.actual_total_spends )
min_value = float((1+channel_bounds[0]/100) * channel_spends )
max_value = float((1+channel_bounds[1]/100) * channel_spends )
#print(st.session_state[channel_name])
spend_input = st.text_input(channel_name,
key=channel_name,
label_visibility='collapsed',
on_change=partial(update_data,channel_name))
if not validate_input(spend_input):
st.error('Invalid input')
st.number_input('Percent change',
key=f'{channel_name}_change',
step= 1,
on_change=partial(update_data_by_percent,channel_name))
with _columns[2]:
# spends
current_channel_spends = float(_channel_class.modified_total_spends * _channel_class.conversion_rate)
actual_channel_spends = float(_channel_class.actual_total_spends * _channel_class.conversion_rate)
spends_delta = float(_channel_class.delta_spends * _channel_class.conversion_rate)
st.session_state['acutual_predicted']['Channel_name'].append(channel_name)
st.session_state['acutual_predicted']['Actual_spend'].append(actual_channel_spends)
st.session_state['acutual_predicted']['Optimized_spend'].append(current_channel_spends)
st.session_state['acutual_predicted']['Delta'].append(spends_delta)
## REMOVE
st.metric('Spends',
format_numbers(current_channel_spends),
delta=numerize(spends_delta,1),
label_visibility='collapsed')
with _columns[3]:
# sales
current_channel_sales = float(_channel_class.modified_total_sales)
actual_channel_sales = float(_channel_class.actual_total_sales)
sales_delta = float(_channel_class.delta_sales)
st.metric(target,
format_numbers(current_channel_sales,include_indicator=False),
delta=numerize(sales_delta,1),
label_visibility='collapsed')
with _columns[4]:
st.checkbox(label='select for optimization',
key=f'{channel_name}_selected',
value=False,
on_change=partial(select_channel_for_optimization,channel_name),
label_visibility='collapsed')
st.markdown("""<hr class="spends-child-seperator">""",unsafe_allow_html=True)
with st.expander("See Response Curves"):
fig = plot_response_curves()
st.plotly_chart(fig,use_container_width=True)
_columns = st.columns(2)
with _columns[0]:
st.subheader('Save Scenario')
scenario_name = st.text_input('Scenario name', key='scenario_input',placeholder='Scenario name',label_visibility='collapsed')
st.button('Save', on_click=lambda : save_scenario(scenario_name),disabled=len(st.session_state['scenario_input']) == 0)
summary_df=pd.DataFrame(st.session_state['acutual_predicted'])
summary_df.drop_duplicates(subset='Channel_name',keep='last',inplace=True)
summary_df_sorted = summary_df.sort_values(by='Delta', ascending=False)
summary_df_sorted['Delta_percent'] = np.round(((summary_df_sorted['Optimized_spend'] / summary_df_sorted['Actual_spend'])-1) * 100, 2)
with open("summary_df.pkl", "wb") as f:
pickle.dump(summary_df_sorted, f)
#st.dataframe(summary_df_sorted)
# ___columns=st.columns(3)
# with ___columns[2]:
# fig=summary_plot(summary_df_sorted, x='Delta_percent', y='Channel_name', title='Delta', text_column='Delta_percent')
# st.plotly_chart(fig,use_container_width=True)
# with ___columns[0]:
# fig=summary_plot(summary_df_sorted, x='Actual_spend', y='Channel_name', title='Actual Spend', text_column='Actual_spend')
# st.plotly_chart(fig,use_container_width=True)
# with ___columns[1]:
# fig=summary_plot(summary_df_sorted, x='Optimized_spend', y='Channel_name', title='Planned Spend', text_column='Optimized_spend')
# st.plotly_chart(fig,use_container_width=True)
elif auth_status == False:
st.error('Username/Password is incorrect')
if auth_status != True:
try:
username_forgot_pw, email_forgot_password, random_password = authenticator.forgot_password('Forgot password')
if username_forgot_pw:
st.session_state['config']['credentials']['usernames'][username_forgot_pw]['password'] = stauth.Hasher([random_password]).generate()[0]
send_email(email_forgot_password, random_password)
st.success('New password sent securely')
# Random password to be transferred to user securely
elif username_forgot_pw == False:
st.error('Username not found')
except Exception as e:
st.error(e)