robertselvam's picture
Update app.py
ab23b37 verified
import pandas as pd
from io import StringIO
import pandas as pd
import numpy as np
import xgboost as xgb
from math import sqrt
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
import plotly.express as px
import logging
from datetime import datetime
import plotly.graph_objects as go
import numpy as np
import matplotlib.pyplot as plt
import plotly.graph_objs as go
from plotly.subplots import make_subplots
from matplotlib import pyplot
import whisper
from openai import AzureOpenAI
from openai import OpenAI
import json
import re
import gradio as gr
# Configure logging
logging.basicConfig(
filename='demand_forecasting.log', # You can adjust the log file name here
filemode='a',
format='[%(asctime)s] [%(levelname)s] [%(filename)s] [%(lineno)s:%(funcName)s()] %(message)s',
datefmt='%Y-%b-%d %H:%M:%S'
)
LOGGER = logging.getLogger(__name__)
log_level_env = 'INFO' # You can adjust the log level here
log_level_dict = {
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'ERROR': logging.ERROR,
'CRITICAL': logging.CRITICAL
}
if log_level_env in log_level_dict:
log_level = log_level_dict[log_level_env]
else:
log_level = log_level_dict['INFO']
LOGGER.setLevel(log_level)
class DemandForecasting:
def __init__(self):
self.client = OpenAI()
def get_column(self,train_csv_path: str):
# Load the training data from the specified CSV file
train_df = pd.read_csv(train_csv_path)
column_names = train_df.columns.tolist()
return column_names
def load_data(self, train_csv_path: str) -> pd.DataFrame:
"""
Load training data from a CSV file.
Args:
train_csv_path (str): Path to the training CSV file.
Returns:
pd.DataFrame: DataFrame containing the training data.
"""
try:
# Load the training data from the specified CSV file
train_df = pd.read_csv(train_csv_path)
# Return a tuple containing the training DataFrame
return train_df
except Exception as e:
# Log an error message if an exception occurs during data loading
LOGGER.error(f"Error loading data: {e}")
# Return None
return None
def find_date_column(self, df_data: pd.DataFrame) -> str:
"""
Find the column containing date-type values from the DataFrame.
Args:
- df_data (pd.DataFrame): Input DataFrame.
Returns:
- str: Name of the column containing date-type values.
"""
for column in df_data.columns:
# Check if the column can be converted to datetime
try:
pd.to_datetime(df_data[column])
return column
except ValueError:
pass
# Return None if no date column is found
return None
def preprocess_data(self, df_data: pd.DataFrame, list_columns: list, target_column: str) -> pd.DataFrame:
"""
Transform date-related data in the DataFrame.
Args:
- df_data (pd.DataFrame): Input DataFrame.
- list_columns (list): List of column names to retain.
- target_column (str): Name of the target column.
Returns:
- pd.DataFrame: Transformed DataFrame.
"""
# Make a copy of the input DataFrame to avoid modifying the original data
df_data = df_data.copy()
list_columns.append(target_column)
# Drop columns not in list_columns
columns_to_drop = [col for col in df_data.columns if col not in list_columns]
df_data.drop(columns=columns_to_drop, inplace=True)
# Find the date column
date_column = self.find_date_column(df_data)
if date_column is None:
raise ValueError("No date column found in the provided list of columns.")
else:
print("date_column", date_column)
# Parse date information only if a valid date column is found
df_data[date_column] = pd.to_datetime(df_data[date_column]) # Convert 'date' column to datetime format
df_data['day'] = df_data[date_column].dt.day # Extract day of the month
df_data['month'] = df_data[date_column].dt.month # Extract month
df_data['year'] = df_data[date_column].dt.year # Extract year
# Cyclical Encoding for Months
df_data['month_sin'] = np.sin(2 * np.pi * df_data['month'] / 12) # Cyclical sine encoding for month
df_data['month_cos'] = np.cos(2 * np.pi * df_data['month'] / 12) # Cyclical cosine encoding for month
# Day of the Week
df_data['day_of_week'] = df_data[date_column].dt.weekday # Extract day of the week (0 = Monday, 6 = Sunday)
# Week of the Year
df_data['week_of_year'] = df_data[date_column].dt.isocalendar().week.astype(int) # Extract week of the year as integer
df_data.drop(columns=[date_column], axis=1, inplace=True) # Drop the original date column
return df_data
def train_model(self, train: pd.DataFrame, target_column, list_columns) -> tuple:
"""
Train an XGBoost model using the provided training data.
Args:
- train (pd.DataFrame): DataFrame containing training data.
Returns:
- tuple: A tuple containing the trained model, true validation labels, and predicted validation labels.
"""
try:
# Extract features and target variable
X = train.drop(columns=[target_column])
y = train[target_column]
# Cannot use cross validation because it will use future data
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=333)
# Convert data into DMatrix format for XGBoost
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)
# Parameters for XGBoost
param = {
'max_depth': 9,
'eta': 0.3,
'objective': 'reg:squarederror'
}
num_round = 60
# Train the model
model_xgb = xgb.train(param, dtrain, num_round)
# Validate the model
y_val_pred = model_xgb.predict(dval) # Predict validation set labels
# Calculate mean squared error
mse = mean_squared_error(y_val, y_val_pred)
# Print validation RMSE
validation = f"Validation RMSE: {np.sqrt(mse)}"
# Return trained model, true validation labels, and predicted validation labels
return model_xgb, y_val, y_val_pred, validation
except Exception as e:
# Log an error message if an exception occurs during model training
LOGGER.error(f"Error training model: {e}")
# Return None for all outputs in case of an error
return None, None, None
def plot_line_graph(self, y_val, y_val_pred):
# Take only the first 1000 data points
num_data_points = 1000
y_val = y_val[:num_data_points]
y_val_pred = y_val_pred[:num_data_points]
# Create Plotly figure
fig = make_subplots(rows=1, cols=1)
# Add actual vs predicted traces to the figure (line plot)
fig.add_trace(go.Scatter(x=np.arange(len(y_val)), y=y_val, mode='lines', name='Actual'), row=1, col=1)
fig.add_trace(go.Scatter(x=np.arange(len(y_val)), y=y_val_pred, mode='lines', name='Predicted'), row=1, col=1)
# Update layout
fig.update_layout(title='Actual vs Predicted Over Time', xaxis_title='Time', yaxis_title='Value')
# Show interactive plot
fig.show()
return fig
def plot_scatter_plot(self, y_val, y_val_pred):
# Take only the first 1000 data points
num_data_points = 1000
y_val = y_val[:num_data_points]
y_val_pred = y_val_pred[:num_data_points]
# Create Plotly figure
fig = make_subplots(rows=1, cols=1)
# Add scatter plots for actual vs predicted (scatter plot)
fig.add_trace(go.Scatter(x=np.arange(len(y_val)), y=y_val, mode='markers', name='Actual', marker=dict(color='blue', size=8)), row=1, col=1)
fig.add_trace(go.Scatter(x=np.arange(len(y_val)), y=y_val_pred, mode='markers', name='Predicted', marker=dict(color='orange', size=8)), row=1, col=1)
# Update layout
fig.update_layout(title='Actual vs Predicted Over Time (Scatter Plot)', xaxis_title='Time', yaxis_title='Value')
# Show interactive plot
fig.show()
return fig
def predict_sales_for_date(self, input_data, model: xgb.Booster) -> float:
"""
Predict the sales for a specific date using the trained model.
Args:
- date_input (str): Date for which sales prediction is needed (in 'YYYY-MM-DD' format).
- model (xgb.Booster): Trained XGBoost model.
- features (pd.DataFrame): DataFrame containing features for the date.
Returns:
- float: Predicted sales value.
"""
try:
input_features = pd.DataFrame([input_data])
# Regular expression pattern for date in the format 'dd-mm-yyyy'
for key, value in input_data.items():
if isinstance(value, str) and re.match(r'\d{2}-\d{2}-\d{4}', value):
date_column = key
if date_column:
# # Assuming date_input is a datetime object
date_input = pd.to_datetime(input_features[date_column])
# Extract day of the month
input_features['day'] = date_input.dt.day
# Extract month
input_features['month'] = date_input.dt.month
# Extract year
input_features['year'] = date_input.dt.year
# Cyclical sine encoding for month
input_features['month_sin'] = np.sin(2 * np.pi * input_features['month'] / 12)
# Cyclical cosine encoding for month
input_features['month_cos'] = np.cos(2 * np.pi * input_features['month'] / 12)
# Extract day of the week (0 = Monday, 6 = Sunday)
input_features['day_of_week'] = date_input.dt.weekday
# Extract week of the year as integer
input_features['week_of_year'] = date_input.dt.isocalendar().week
input_features.drop(columns=[date_column], inplace=True)
# Convert input features to DMatrix format
dinput = xgb.DMatrix(input_features)
# Make predictions using the trained model
predicted_sales = model.predict(dinput)[0]
# Print the predicted sales value
predicted_result = f"""Date: {input_data[str(date_column)]} Predicted Value: {predicted_sales}"""
# Return the predicted sales value
return predicted_result
except Exception as e:
# Log an error message if an exception occurs during sales prediction
LOGGER.error(f"Error predicting sales: {e}")
# Return None in case of an error
return None
def audio_to_text(self, audio_path):
audio_file= open(audio_path, "rb")
transcription = self.client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
language="en")
print(transcription.text)
return transcription.text
def parse_text(self, text, column_list):
# Define the prompt or input for the model
conversation =[{"role": "system", "content": ""},
{"role": "user", "content":f""" Extract the values for this given column list:{column_list}, from the given text. all values should be integer data type. if date in given text, the date format should be in dd-mm-YYYY.
text```{text}```
the text may contains other name key and values, use consine similarity to map with column list.
the column names should be keys.
return result should be in JSON format:
"""
}]
# Generate a response from the GPT-3 model
chat_completion = self.client.chat.completions.create(
model = "gpt-3.5-turbo",
messages = conversation,
max_tokens=500,
temperature=0,
n=1,
stop=None,
response_format={ "type": "json_object" },
)
# Extract the generated text from the API response
generated_text = chat_completion.choices[0].message.content
print(generated_text)
# # Assuming jsonString is your JSON string
try:
json_data = json.loads(generated_text)
except Exception as e:
return e
# print("parse_text",json_data)
return json_data
def main(self, train_csv_path: str, audio_path, target_column, column_list) -> None:
"""
Main function to execute the demand forecasting pipeline.
Args:
- train_csv_path (str): Path to the training CSV file.
- date (str): Date for which sales prediction is needed (in 'YYYY-MM-DD' format).
"""
try:
# Split the string by comma and convert it into a list
column_list = column_list.split(",")
text = self.audio_to_text(audio_path)
input_data = self.parse_text(text, column_list)
#load data
train_data = self.load_data(train_csv_path)
#preprocess the train data
train_df = self.preprocess_data(train_data, column_list, target_column)
# Train model and get validation predictions
trained_model, y_val, y_val_pred, validation = self.train_model(train_df, target_column, column_list)
# Plot interactive evaluation for training
line_graph = self.plot_line_graph(y_val, y_val_pred)
scatter_plot = self.plot_scatter_plot(y_val, y_val_pred)
# Predict sales for the specified date using the trained model
predicted_value = self.predict_sales_for_date(input_data, trained_model)
return validation, line_graph, scatter_plot, predicted_value
except Exception as e:
# Log an error message if an exception occurs in the main function
LOGGER.error(f"Error in main function: {e}")
def gradio_interface(self):
with gr.Blocks(css="style.css", theme="freddyaboulton/test-blue") as demo:
gr.HTML("""<center><h1 style="color:#fff">Demand Forecasting</h1></center>""")
with gr.Row():
with gr.Column(scale=0.50):
train_csv = gr.File(elem_classes="uploadbutton")
with gr.Column(scale=0.50):
column_list = gr.Textbox(label="Column List")
with gr.Row():
with gr.Column(scale=0.50):
audio_path = gr.Audio(sources=["microphone"], type="filepath")
with gr.Row():
with gr.Column(scale=0.50):
selected_column = gr.Textbox(label="Select column")
with gr.Column(scale=0.50):
target_column = gr.Textbox(label="target column")
with gr.Row():
validation = gr.Textbox(label="Validation")
with gr.Row():
predicted_result = gr.Textbox(label="Predicted Result")
with gr.Row():
line_plot = gr.Plot()
with gr.Row():
scatter_plot = gr.Plot()
train_csv.upload(self.get_column, train_csv, column_list)
audio_path.stop_recording(self.main, [train_csv, audio_path, target_column, selected_column], [validation, line_plot, scatter_plot, predicted_result])
demo.launch(debug=True)
if __name__ == "__main__":
demand = DemandForecasting()
demand.gradio_interface()