import streamlit as st import pandas as pd from datetime import datetime import numpy as np import pmdarima as pm from pmdarima import auto_arima import torch from transformers import pipeline, TapasTokenizer, TapasForQuestionAnswering st.set_page_config( page_title="Sales Forecasting System", page_icon="📈", layout="wide", initial_sidebar_state="expanded", ) # Preprocessing def merge(B, C, A): i = j = k = 0 # Convert 'Date' columns to datetime.date objects B['Date'] = pd.to_datetime(B['Date']).dt.date C['Date'] = pd.to_datetime(C['Date']).dt.date A['Date'] = pd.to_datetime(A['Date']).dt.date while i < len(B) and j < len(C): if B['Date'].iloc[i] <= C['Date'].iloc[j]: A['Date'].iloc[k] = B['Date'].iloc[i] A['Sales'].iloc[k] = B['Sales'].iloc[i] i += 1 else: A['Date'].iloc[k] = C['Date'].iloc[j] A['Sales'].iloc[k] = C['Sales'].iloc[j] j += 1 k += 1 while i < len(B): A['Date'].iloc[k] = B['Date'].iloc[i] A['Sales'].iloc[k] = B['Sales'].iloc[i] i += 1 k += 1 while j < len(C): A['Date'].iloc[k] = C['Date'].iloc[j] A['Sales'].iloc[k] = C['Sales'].iloc[j] j += 1 k += 1 return A def merge_sort(dataframe): if len(dataframe) > 1: center = len(dataframe) // 2 left = dataframe.iloc[:center] right = dataframe.iloc[center:] merge_sort(left) merge_sort(right) return merge(left, right, dataframe) else: return dataframe def drop (dataframe): def get_columns_containing(dataframe, substrings): return [col for col in dataframe.columns if any(substring.lower() in col.lower() for substring in substrings)] columns_to_keep = get_columns_containing(dataframe, ["date", "sale"]) dataframe = dataframe.drop(columns=dataframe.columns.difference(columns_to_keep)) dataframe = dataframe.dropna() return dataframe def date_format(dataframe): for i, d, s in dataframe.itertuples(): dataframe['Date'][i] = dataframe['Date'][i].strip() for i, d, s in dataframe.itertuples(): new_date = datetime.strptime(dataframe['Date'][i], "%m/%d/%Y").date() dataframe['Date'][i] = new_date return dataframe def group_to_three(dataframe): dataframe['Date'] = pd.to_datetime(dataframe['Date']) dataframe = dataframe.groupby([pd.Grouper(key='Date', freq='3D')])['Sales'].mean().round(2) dataframe = dataframe.replace(0, np.nan).dropna() return dataframe # SARIMAX Model def train_test(dataframe, n): training_y = dataframe.iloc[:-n,0] test_y = dataframe.iloc[-n:,0] test_y_series = pd.Series(test_y, index=dataframe.iloc[-n:, 0].index) training_X = dataframe.iloc[:-n,1:] test_X = dataframe.iloc[-n:,1:] future_X = dataframe.iloc[0:,1:] return (training_y, test_y, test_y_series, training_X, test_X, future_X) def model_fitting(dataframe, Exo): futureModel = pm.auto_arima(dataframe['Sales'], X=Exo, start_p=1, start_q=1, test='adf',min_p=1,min_q=1, max_p=3, max_q=3, m=12, start_P=0, seasonal=True, d=None, D=1, trace=True, error_action='ignore', suppress_warnings=True, stepwise=True) model = futureModel return model def test_fitting(dataframe, Exo, trainY): trainTestModel = auto_arima(X = Exo, y = trainY, start_p=1, start_q=1, test='adf',min_p=1,min_q=1, max_p=3, max_q=3, m=12, start_P=0, seasonal=True, d=None, D=1, trace=True, error_action='ignore', suppress_warnings=True, stepwise=True) model = trainTestModel return model def forecast_accuracy(forecast, actual): mape = np.mean(np.abs(forecast - actual)/np.abs(actual)).round(4) # MAPE rmse = (np.mean((forecast - actual)**2)**.5).round(2) # RMSE corr = np.corrcoef(forecast, actual)[0,1] # corr mins = np.amin(np.hstack([forecast[:,None], actual[:,None]]), axis=1) maxs = np.amax(np.hstack([forecast[:,None], actual[:,None]]), axis=1) minmax = 1 - np.mean(mins/maxs) # minmax return({'mape':mape, 'rmse':rmse, 'corr':corr, 'min-max':minmax}) def sales_growth(dataframe, fittedValues): sales_growth = fittedValues.to_frame() sales_growth = sales_growth.reset_index() sales_growth.columns = ("Date", "Sales") sales_growth = sales_growth.set_index('Date') sales_growth['Sales'] = (sales_growth['Sales']).round(2) #Calculate and create the column for sales difference and growth sales_growth['Forecasted Sales First Difference']=(sales_growth['Sales']-sales_growth['Sales'].shift(1)).round(2) sales_growth['Forecasted Sales Growth']=(((sales_growth['Sales']-sales_growth['Sales'].shift(1))/sales_growth['Sales'].shift(1))*100).round(2) #Calculate and create the first row for sales difference and growth sales_growth['Forecasted Sales First Difference'].iloc[0] = (dataframe['Sales'].iloc[-1]-dataframe['Sales'].iloc[-2]).round(2) sales_growth['Forecasted Sales Growth'].iloc[0]=(((dataframe['Sales'].iloc[-1]-dataframe['Sales'].iloc[-2])/dataframe['Sales'].iloc[-1])*100).round(2) return sales_growth # TAPAS Model model_name = "google/tapas-large-finetuned-wtq" tokenizer = TapasTokenizer.from_pretrained(model_name) model = TapasForQuestionAnswering.from_pretrained(model_name, local_files_only=False) def load_tapas_model(model, tokenizer): pipe = pipeline("table-question-answering", model=model, tokenizer=tokenizer) return pipe pipe = load_tapas_model(model, tokenizer) def get_answer(table, query): answers = pipe(table=table, query=query) return answers def convert_answer(answer): if answer['aggregator'] == 'SUM': cells = answer['cells'] converted = sum(float(value.replace(',', '')) for value in cells) return converted if answer['aggregator'] == 'AVERAGE': cells = answer['cells'] values = [float(value.replace(',', '')) for value in cells] converted = sum(values) / len(values) return converted if answer['aggregator'] == 'COUNT': cells = answer['cells'] converted = sum(int(value.replace(',', '')) for value in cells) return converted else: return answer def get_converted_answer(table, query): converted_answer = convert_answer(get_answer(table, query)) return converted_answer # Web Application st.title("Sales Forecasting Dashboard") st.write("📈 Welcome User, start using the application by uploading your file in the sidebar!") # Session States if 'uploaded' not in st.session_state: st.session_state.uploaded = False # Sidebar Menu with st.sidebar: uploaded_file = st.file_uploader("Upload your Store Data here (must atleast contain Date and Sale)", type=["csv"]) err = 0 if uploaded_file is not None: if uploaded_file.type != 'text/csv': err = 1 st.info('Please upload in CSV format only...') else: st.success("File uploaded successfully!") df = pd.read_csv(uploaded_file, parse_dates=True) st.write("Your uploaded data:") st.write(df) # Data pre-processing df = drop(df) df = date_format(df) merge_sort(df) df = group_to_three(df) st.session_state.uploaded = True with open('sample.csv', 'rb') as f: st.download_button("Download our sample CSV", f, file_name='sample.csv') if (st.session_state.uploaded): st.line_chart(df) forecast_button = st.button( 'Start Forecasting', key='forecast_button', type="primary", ) if (forecast_button): df = df.to_frame() df = df.reset_index() df = df.set_index('Date') df = df.dropna() # Create the eXogenous values df['Sales First Difference'] = df['Sales'] - df['Sales'].shift(1) df['Seasonal First Difference'] = df['Sales'] - df['Sales'].shift(12) df = df.dropna() auto_train_test = train_test(df, 20) training_y, test_y, test_y_series, training_X, test_X, future_X = auto_train_test # Auto_arima to fit the model to forecast future sales future_model = model_fitting(df, future_X) # Auto_arima to check the accuracy of the train test split train_test_model = test_fitting(df, training_X, training_y) # Forecast (testing) n_periods = 20 fitted, confint = train_test_model.predict(X=test_X, n_periods=n_periods, return_conf_int=True) index_of_fc = test_y_series.index # make series for plotting purpose fitted_series = pd.Series(fitted) fitted_series.index=index_of_fc lower_series = pd.Series(confint[:, 0], index=index_of_fc) upper_series = pd.Series(confint[:, 1], index=index_of_fc) test_y, predictions = np.array(test_y), np.array(fitted) forecast_accuracy(predictions, test_y) # Forecast (actual) n_periods = 36 freq='3D' future_fitted, confint = future_model.predict(X=df.iloc[-n_periods:,1:], n_periods=n_periods, return_conf_int=True, freq=freq) future_index_of_fc = pd.date_range(df['Sales'].index[-1], periods = n_periods, freq=freq) # make series for plotting purpose future_fitted_series = pd.Series(future_fitted) future_fitted_series.index=future_index_of_fc future_lower_series = pd.Series(confint[:, 0], index=future_index_of_fc) future_upper_series = pd.Series(confint[:, 1], index=future_index_of_fc) # Streamlit plot # st.title("Forecasted Sales") # st.line_chart(df['Sales'][-50:]) # st.line_chart(future_fitted_series, use_container_width=True) # st.area_chart(pd.concat([future_lower_series, future_upper_series], axis=1), color="#808080") auto_sales_growth = sales_growth(df, future_fitted_series) df = auto_sales_growth df = df.reset_index() df['Date'] = df['Date'].dt.strftime('%B %d, %Y') df[df.columns] = df[df.columns].astype(str) st.write("Forecasted sales in the next 3 months") st.write(df) with st.form("question_form"): # Add a text input for the question question = st.text_input('Ask a Question about the Forecasted Data', placeholder="What is the total sales in the month of December?") # Add a button to submit the form query_button = st.form_submit_button(label='Generate Answer') if query_button: answer = get_converted_answer(df, question) st.write("The answer is:", answer)