import streamlit as st
import pandas as pd
import time
from datetime import datetime
import numpy as np
import pmdarima as pm
import matplotlib.pyplot as plt
from pmdarima import auto_arima
# import plotly.graph_objects as go
import torch
from transformers import pipeline, TapasTokenizer, TapasForQuestionAnswering
page_title="Sales Forecasting System",
# Preprocessing
def merge(B, C, A):
i = j = k = 0
# Convert 'Date' columns to objects
B['Date'] = pd.to_datetime(B['Date'])
C['Date'] = pd.to_datetime(C['Date'])
A['Date'] = pd.to_datetime(A['Date'])
while i < len(B) and j < len(C):
if B['Date'].iloc[i] <= C['Date'].iloc[j]:
A['Date'].iloc[k] = B['Date'].iloc[i]
A['Sales'].iloc[k] = B['Sales'].iloc[i]
i += 1
A['Date'].iloc[k] = C['Date'].iloc[j]
A['Sales'].iloc[k] = C['Sales'].iloc[j]
j += 1
k += 1
while i < len(B):
A['Date'].iloc[k] = B['Date'].iloc[i]
A['Sales'].iloc[k] = B['Sales'].iloc[i]
i += 1
k += 1
while j < len(C):
A['Date'].iloc[k] = C['Date'].iloc[j]
A['Sales'].iloc[k] = C['Sales'].iloc[j]
j += 1
k += 1
return A
def merge_sort(dataframe):
if len(dataframe) > 1:
center = len(dataframe) // 2
left = dataframe.iloc[:center]
right = dataframe.iloc[center:]
return merge(left, right, dataframe)
return dataframe
def drop (dataframe):
def get_columns_containing(dataframe, substrings):
return [col for col in dataframe.columns if any(substring.lower() in col.lower() for substring in substrings)]
columns_to_keep = get_columns_containing(dataframe, ["date", "sale"])
dataframe = dataframe.drop(columns=dataframe.columns.difference(columns_to_keep))
dataframe = dataframe.dropna()
return dataframe
def date_format(dataframe):
for i, d, s in dataframe.itertuples():
dataframe['Date'][i] = dataframe['Date'][i].strip()
for i, d, s in dataframe.itertuples():
new_date = datetime.strptime(dataframe['Date'][i], "%m/%d/%Y").date()
dataframe['Date'][i] = new_date
return dataframe
def group_to_three(dataframe):
dataframe['Date'] = pd.to_datetime(dataframe['Date'])
dataframe = dataframe.groupby([pd.Grouper(key='Date', freq='3D')])['Sales'].mean().round(2)
dataframe = dataframe.replace(0, np.nan).dropna()
return dataframe
def series_to_df_exogenous(series):
dataframe = series.to_frame()
dataframe = dataframe.reset_index()
dataframe = dataframe.set_index('Date')
dataframe = dataframe.dropna()
# Create the eXogenous values
dataframe['Sales First Difference'] = dataframe['Sales'] - dataframe['Sales'].shift(1)
dataframe['Seasonal First Difference'] = dataframe['Sales'] - dataframe['Sales'].shift(12)
dataframe = dataframe.dropna()
return dataframe
def dates_df(dataframe):
dataframe = dataframe.reset_index()
dataframe['Date'] = dataframe['Date'].dt.strftime('%B %d, %Y')
dataframe[dataframe.columns] = dataframe[dataframe.columns].astype(str)
return dataframe
def get_forecast_period(period):
return round(period / 3)
def train_test(dataframe):
n = round(len(dataframe) * 0.2)
training_y = dataframe.iloc[:-n,0]
test_y = dataframe.iloc[-n:,0]
test_y_series = pd.Series(test_y, index=dataframe.iloc[-n:, 0].index)
training_X = dataframe.iloc[:-n,1:]
test_X = dataframe.iloc[-n:,1:]
future_X = dataframe.iloc[0:,1:]
return (training_y, test_y, test_y_series, training_X, test_X, future_X)
def test_fitting(dataframe, Exo, trainY):
trainTestModel = auto_arima(X = Exo, y = trainY, start_p=1, start_q=1,
max_p=3, max_q=3, m=12,
start_P=0, seasonal=True,
d=None, D=1, trace=True,
model = trainTestModel
return model
def forecast_accuracy(forecast, actual):
mape = np.mean(np.abs(forecast - actual)/np.abs(actual)).round(4) # MAPE
rmse = (np.mean((forecast - actual)**2)**.5).round(2) # RMSE
corr = np.corrcoef(forecast, actual)[0,1] # corr
mins = np.amin(np.hstack([forecast[:,None],
actual[:,None]]), axis=1)
maxs = np.amax(np.hstack([forecast[:,None],
actual[:,None]]), axis=1)
minmax = 1 - np.mean(mins/maxs) # minmax
return({'mape':mape, 'rmse':rmse, 'corr':corr, 'min-max':minmax})
def sales_growth(dataframe, fittedValues):
sales_growth = fittedValues.to_frame()
sales_growth = sales_growth.reset_index()
sales_growth.columns = ("Date", "Sales")
sales_growth = sales_growth.set_index('Date')
sales_growth['Sales'] = (sales_growth['Sales']).round(2)
# Calculate and create the column for sales difference and growth
sales_growth['Forecasted Sales First Difference']=(sales_growth['Sales']-sales_growth['Sales'].shift(1)).round(2)
sales_growth['Forecasted Sales Growth']=(((sales_growth['Sales']-sales_growth['Sales'].shift(1))/sales_growth['Sales'].shift(1))*100).round(2)
# Calculate and create the first row for sales difference and growth
sales_growth['Forecasted Sales First Difference'].iloc[0] = (dataframe['Sales'].iloc[-1]-dataframe['Sales'].iloc[-2]).round(2)
sales_growth['Forecasted Sales Growth'].iloc[0]=(((dataframe['Sales'].iloc[-1]-dataframe['Sales'].iloc[-2])/dataframe['Sales'].iloc[-1])*100).round(2)
return sales_growth
def merge_forecast_data(historical, test, future):
historical.rename(columns={historical.columns[0]: "Actual Sales"}, inplace=True)
test = test.to_frame()
test.rename(columns={test.columns[0]: "Predicted Sales"}, inplace=True)
future = future.to_frame()
future.rename(columns={future.columns[0]: "Future Forecasted Sales"}, inplace=True)
merged_dataframe = pd.merge(historical, test, future, on='Date', how='outer')
return merged_dataframe
# TAPAS Model
def load_tapas_model():
model_name = "google/tapas-large-finetuned-wtq"
tokenizer = TapasTokenizer.from_pretrained(model_name)
model = TapasForQuestionAnswering.from_pretrained(model_name, local_files_only=False)
pipe = pipeline("table-question-answering", model=model, tokenizer=tokenizer)
return pipe
pipe = load_tapas_model()
def get_answer(table, query):
answers = pipe(table=table, query=query)
return answers
def convert_answer(answer):
if answer['aggregator'] == 'SUM':
cells = answer['cells']
converted = sum(float(value.replace(',', '')) for value in cells)
return converted
if answer['aggregator'] == 'AVERAGE':
cells = answer['cells']
values = [float(value.replace(',', '')) for value in cells]
converted = sum(values) / len(values)
return converted
if answer['aggregator'] == 'COUNT':
cells = answer['cells']
converted = sum(int(value.replace(',', '')) for value in cells)
return converted
return answer
def get_converted_answer(table, query):
converted_answer = convert_answer(get_answer(table, query))
return converted_answer
# Web Application
st.title("Forecasting Dashboard πŸ“ˆ")
st.subheader("Welcome User, start forecasting by uploading your file in the sidebar!")
# Session States
if 'uploaded' not in st.session_state:
st.session_state.uploaded = False
if 'forecasted' not in st.session_state:
st.session_state.forecasted = False
# Sidebar Menu
with st.sidebar:
# TODO Name for product
st.title("MaLaCast v1.0")
st.subheader("An intelligent sales forecasting system")
uploaded_file = st.file_uploader("Upload your store data here to proceed (must atleast contain Date and Sales)", type=["csv"])
if uploaded_file is not None:
date_found = False
sales_found = False
df = pd.read_csv(uploaded_file, parse_dates=True)
for column in df.columns:
if 'Date' in column:
date_found = True
if 'Sales' in column:
sales_found = True
if(date_found == False or sales_found == False):
st.error('Please upload a csv containing both Date and Sales...')
st.success("File uploaded successfully!")
st.write("Your uploaded data:")
df = drop(df)
df = date_format(df)
series = group_to_three(df)
st.session_state.uploaded = True
with open('sample.csv', 'rb') as f:
st.download_button("Download our sample CSV", f, file_name='sample.csv')
if (st.session_state.uploaded):
period = st.slider('How many days would you like to forecast?', min_value=MIN_DAYS, max_value=MAX_DAYS)
forecast_period = get_forecast_period(period)
forecast_button = st.button(
'Start Forecasting',
if (forecast_button or st.session_state.forecasted):
df = series_to_df_exogenous(series)
train = train_test(df)
training_y, test_y, test_y_series, training_X, test_X, future_X = train
train_test_model = test_fitting(df, training_X, training_y)
n_periods = round(len(df) * 0.2)
future_n_periods = forecast_period + n_periods
fitted, confint = train_test_model.predict(X=test_X, n_periods=n_periods, return_conf_int=True)
index_of_fc = test_y_series.index
# make series for plotting purpose
fitted_series = pd.Series(fitted)
fitted_series.index = index_of_fc
lower_series = pd.Series(confint[:, 0], index=index_of_fc)
upper_series = pd.Series(confint[:, 1], index=index_of_fc)
# plt.plot(df['Sales'], color='b', label = 'Actual Sales')
# plt.plot(test_y, color='b')
# plt.plot(fitted_series, color='r', label = 'Predicted Sales')
# plt.title("SARIMAX - Forecast of Auto Business Retail Sales VS Actual Sales")
# plt.legend(loc='upper left', fontsize=8)
#Future predictions
frequency = '3D'
future_fitted, confint = train_test_model.predict(X=df.iloc[-future_n_periods:,1:], n_periods=future_n_periods, return_conf_int=True, freq=frequency)
future_index_of_fc = pd.date_range(df['Sales'].index[-1], periods = future_n_periods, freq=frequency)
# make series for future plotting purpose
future_fitted_series = pd.Series(future_fitted)
future_fitted_series.index = future_index_of_fc
future_lower_series = pd.Series(confint[:, 0], index=future_index_of_fc)
future_upper_series = pd.Series(confint[:, 1], index=future_index_of_fc)
# TODO Plot Future
# plt.plot(future_fitted_series, color='darkgreen', label ='Future Forecasted Sales')
# plt.fill_between(future_lower_series.index,
# future_lower_series,
# future_upper_series,
# color='k', alpha=.15)
# plt.fill_between(lower_series.index,
# lower_series,
# upper_series,
# color='k', alpha=.15)
future_sales_growth = sales_growth(df, future_fitted_series)
future_sales_growth = future_sales_growth.iloc[n_periods:]
df = dates_df(future_sales_growth)
col = st.columns(2)
with col[0]:
col[0].header("Sales Forecast")
merged_data = merge_forecast_data(df['Data'], fitted_series, future_fitted_series)
with col[1]:
col[1].subheader("Forecasted sales in x days")
st.session_state.forecasted = True
with st.form("question_form"):
question = st.text_input('Ask a Question about the Forecasted Data', placeholder="What is the total sales in the month of December?")
query_button = st.form_submit_button(label='Generate Answer')
if query_button or question:
answer = get_converted_answer(df, question)
if answer is not None:
st.subheader("The answer is:", answer)
st.subheader("Answer is not found in table")
# Hide Streamlit default style
hide_st_style = """
footer {visibility: hidden;}
hide_st_style = """
footer {visibility: hidden;}
"""