Spaces:
Sleeping
Sleeping
import pandas as pd | |
from io import StringIO | |
import pandas as pd | |
import numpy as np | |
import xgboost as xgb | |
from math import sqrt | |
from sklearn.metrics import mean_squared_error | |
from sklearn.model_selection import train_test_split | |
import plotly.express as px | |
import logging | |
from datetime import datetime | |
import plotly.graph_objects as go | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import plotly.graph_objs as go | |
from plotly.subplots import make_subplots | |
from matplotlib import pyplot | |
import whisper | |
from openai import AzureOpenAI | |
from openai import OpenAI | |
import json | |
import re | |
import gradio as gr | |
# Configure logging | |
logging.basicConfig( | |
filename='demand_forecasting.log', # You can adjust the log file name here | |
filemode='a', | |
format='[%(asctime)s] [%(levelname)s] [%(filename)s] [%(lineno)s:%(funcName)s()] %(message)s', | |
datefmt='%Y-%b-%d %H:%M:%S' | |
) | |
LOGGER = logging.getLogger(__name__) | |
log_level_env = 'INFO' # You can adjust the log level here | |
log_level_dict = { | |
'DEBUG': logging.DEBUG, | |
'INFO': logging.INFO, | |
'WARNING': logging.WARNING, | |
'ERROR': logging.ERROR, | |
'CRITICAL': logging.CRITICAL | |
} | |
if log_level_env in log_level_dict: | |
log_level = log_level_dict[log_level_env] | |
else: | |
log_level = log_level_dict['INFO'] | |
LOGGER.setLevel(log_level) | |
class DemandForecasting: | |
def __init__(self): | |
self.client = OpenAI() | |
def get_column(self,train_csv_path: str): | |
# Load the training data from the specified CSV file | |
train_df = pd.read_csv(train_csv_path) | |
column_names = train_df.columns.tolist() | |
return column_names | |
def load_data(self, train_csv_path: str) -> pd.DataFrame: | |
""" | |
Load training data from a CSV file. | |
Args: | |
train_csv_path (str): Path to the training CSV file. | |
Returns: | |
pd.DataFrame: DataFrame containing the training data. | |
""" | |
try: | |
# Load the training data from the specified CSV file | |
train_df = pd.read_csv(train_csv_path) | |
# Return a tuple containing the training DataFrame | |
return train_df | |
except Exception as e: | |
# Log an error message if an exception occurs during data loading | |
LOGGER.error(f"Error loading data: {e}") | |
# Return None | |
return None | |
def find_date_column(self, df_data: pd.DataFrame) -> str: | |
""" | |
Find the column containing date-type values from the DataFrame. | |
Args: | |
- df_data (pd.DataFrame): Input DataFrame. | |
Returns: | |
- str: Name of the column containing date-type values. | |
""" | |
for column in df_data.columns: | |
# Check if the column can be converted to datetime | |
try: | |
pd.to_datetime(df_data[column]) | |
return column | |
except ValueError: | |
pass | |
# Return None if no date column is found | |
return None | |
def preprocess_data(self, df_data: pd.DataFrame, list_columns: list, target_column: str) -> pd.DataFrame: | |
""" | |
Transform date-related data in the DataFrame. | |
Args: | |
- df_data (pd.DataFrame): Input DataFrame. | |
- list_columns (list): List of column names to retain. | |
- target_column (str): Name of the target column. | |
Returns: | |
- pd.DataFrame: Transformed DataFrame. | |
""" | |
# Make a copy of the input DataFrame to avoid modifying the original data | |
df_data = df_data.copy() | |
list_columns.append(target_column) | |
# Drop columns not in list_columns | |
columns_to_drop = [col for col in df_data.columns if col not in list_columns] | |
df_data.drop(columns=columns_to_drop, inplace=True) | |
# Find the date column | |
date_column = self.find_date_column(df_data) | |
if date_column is None: | |
raise ValueError("No date column found in the provided list of columns.") | |
else: | |
print("date_column", date_column) | |
# Parse date information only if a valid date column is found | |
df_data[date_column] = pd.to_datetime(df_data[date_column]) # Convert 'date' column to datetime format | |
df_data['day'] = df_data[date_column].dt.day # Extract day of the month | |
df_data['month'] = df_data[date_column].dt.month # Extract month | |
df_data['year'] = df_data[date_column].dt.year # Extract year | |
# Cyclical Encoding for Months | |
df_data['month_sin'] = np.sin(2 * np.pi * df_data['month'] / 12) # Cyclical sine encoding for month | |
df_data['month_cos'] = np.cos(2 * np.pi * df_data['month'] / 12) # Cyclical cosine encoding for month | |
# Day of the Week | |
df_data['day_of_week'] = df_data[date_column].dt.weekday # Extract day of the week (0 = Monday, 6 = Sunday) | |
# Week of the Year | |
df_data['week_of_year'] = df_data[date_column].dt.isocalendar().week.astype(int) # Extract week of the year as integer | |
df_data.drop(columns=[date_column], axis=1, inplace=True) # Drop the original date column | |
return df_data | |
def train_model(self, train: pd.DataFrame, target_column, list_columns) -> tuple: | |
""" | |
Train an XGBoost model using the provided training data. | |
Args: | |
- train (pd.DataFrame): DataFrame containing training data. | |
Returns: | |
- tuple: A tuple containing the trained model, true validation labels, and predicted validation labels. | |
""" | |
try: | |
# Extract features and target variable | |
X = train.drop(columns=[target_column]) | |
y = train[target_column] | |
# Cannot use cross validation because it will use future data | |
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=333) | |
# Convert data into DMatrix format for XGBoost | |
dtrain = xgb.DMatrix(X_train, label=y_train) | |
dval = xgb.DMatrix(X_val, label=y_val) | |
# Parameters for XGBoost | |
param = { | |
'max_depth': 9, | |
'eta': 0.3, | |
'objective': 'reg:squarederror' | |
} | |
num_round = 60 | |
# Train the model | |
model_xgb = xgb.train(param, dtrain, num_round) | |
# Validate the model | |
y_val_pred = model_xgb.predict(dval) # Predict validation set labels | |
# Calculate mean squared error | |
mse = mean_squared_error(y_val, y_val_pred) | |
# Print validation RMSE | |
validation = f"Validation RMSE: {np.sqrt(mse)}" | |
# Return trained model, true validation labels, and predicted validation labels | |
return model_xgb, y_val, y_val_pred, validation | |
except Exception as e: | |
# Log an error message if an exception occurs during model training | |
LOGGER.error(f"Error training model: {e}") | |
# Return None for all outputs in case of an error | |
return None, None, None | |
def plot_line_graph(self, y_val, y_val_pred): | |
# Take only the first 1000 data points | |
num_data_points = 1000 | |
y_val = y_val[:num_data_points] | |
y_val_pred = y_val_pred[:num_data_points] | |
# Create Plotly figure | |
fig = make_subplots(rows=1, cols=1) | |
# Add actual vs predicted traces to the figure (line plot) | |
fig.add_trace(go.Scatter(x=np.arange(len(y_val)), y=y_val, mode='lines', name='Actual'), row=1, col=1) | |
fig.add_trace(go.Scatter(x=np.arange(len(y_val)), y=y_val_pred, mode='lines', name='Predicted'), row=1, col=1) | |
# Update layout | |
fig.update_layout(title='Actual vs Predicted Over Time', xaxis_title='Time', yaxis_title='Value') | |
# Show interactive plot | |
fig.show() | |
return fig | |
def plot_scatter_plot(self, y_val, y_val_pred): | |
# Take only the first 1000 data points | |
num_data_points = 1000 | |
y_val = y_val[:num_data_points] | |
y_val_pred = y_val_pred[:num_data_points] | |
# Create Plotly figure | |
fig = make_subplots(rows=1, cols=1) | |
# Add scatter plots for actual vs predicted (scatter plot) | |
fig.add_trace(go.Scatter(x=np.arange(len(y_val)), y=y_val, mode='markers', name='Actual', marker=dict(color='blue', size=8)), row=1, col=1) | |
fig.add_trace(go.Scatter(x=np.arange(len(y_val)), y=y_val_pred, mode='markers', name='Predicted', marker=dict(color='orange', size=8)), row=1, col=1) | |
# Update layout | |
fig.update_layout(title='Actual vs Predicted Over Time (Scatter Plot)', xaxis_title='Time', yaxis_title='Value') | |
# Show interactive plot | |
fig.show() | |
return fig | |
def predict_sales_for_date(self, input_data, model: xgb.Booster) -> float: | |
""" | |
Predict the sales for a specific date using the trained model. | |
Args: | |
- date_input (str): Date for which sales prediction is needed (in 'YYYY-MM-DD' format). | |
- model (xgb.Booster): Trained XGBoost model. | |
- features (pd.DataFrame): DataFrame containing features for the date. | |
Returns: | |
- float: Predicted sales value. | |
""" | |
try: | |
input_features = pd.DataFrame([input_data]) | |
# Regular expression pattern for date in the format 'dd-mm-yyyy' | |
for key, value in input_data.items(): | |
if isinstance(value, str) and re.match(r'\d{2}-\d{2}-\d{4}', value): | |
date_column = key | |
if date_column: | |
# # Assuming date_input is a datetime object | |
date_input = pd.to_datetime(input_features[date_column]) | |
# Extract day of the month | |
input_features['day'] = date_input.dt.day | |
# Extract month | |
input_features['month'] = date_input.dt.month | |
# Extract year | |
input_features['year'] = date_input.dt.year | |
# Cyclical sine encoding for month | |
input_features['month_sin'] = np.sin(2 * np.pi * input_features['month'] / 12) | |
# Cyclical cosine encoding for month | |
input_features['month_cos'] = np.cos(2 * np.pi * input_features['month'] / 12) | |
# Extract day of the week (0 = Monday, 6 = Sunday) | |
input_features['day_of_week'] = date_input.dt.weekday | |
# Extract week of the year as integer | |
input_features['week_of_year'] = date_input.dt.isocalendar().week | |
input_features.drop(columns=[date_column], inplace=True) | |
# Convert input features to DMatrix format | |
dinput = xgb.DMatrix(input_features) | |
# Make predictions using the trained model | |
predicted_sales = model.predict(dinput)[0] | |
# Print the predicted sales value | |
predicted_result = f"""Date: {input_data[str(date_column)]} Predicted Value: {predicted_sales}""" | |
# Return the predicted sales value | |
return predicted_result | |
except Exception as e: | |
# Log an error message if an exception occurs during sales prediction | |
LOGGER.error(f"Error predicting sales: {e}") | |
# Return None in case of an error | |
return None | |
def audio_to_text(self, audio_path): | |
audio_file= open(audio_path, "rb") | |
transcription = self.client.audio.transcriptions.create( | |
model="whisper-1", | |
file=audio_file, | |
language="en") | |
print(transcription.text) | |
return transcription.text | |
def parse_text(self, text, column_list): | |
# Define the prompt or input for the model | |
conversation =[{"role": "system", "content": ""}, | |
{"role": "user", "content":f""" Extract the values for this given column list:{column_list}, from the given text. all values should be integer data type. if date in given text, the date format should be in dd-mm-YYYY. | |
text```{text}``` | |
the text may contains other name key and values, use consine similarity to map with column list. | |
the column names should be keys. | |
return result should be in JSON format: | |
""" | |
}] | |
# Generate a response from the GPT-3 model | |
chat_completion = self.client.chat.completions.create( | |
model = "gpt-3.5-turbo", | |
messages = conversation, | |
max_tokens=500, | |
temperature=0, | |
n=1, | |
stop=None, | |
response_format={ "type": "json_object" }, | |
) | |
# Extract the generated text from the API response | |
generated_text = chat_completion.choices[0].message.content | |
print(generated_text) | |
# # Assuming jsonString is your JSON string | |
try: | |
json_data = json.loads(generated_text) | |
except Exception as e: | |
return e | |
# print("parse_text",json_data) | |
return json_data | |
def main(self, train_csv_path: str, audio_path, target_column, column_list) -> None: | |
""" | |
Main function to execute the demand forecasting pipeline. | |
Args: | |
- train_csv_path (str): Path to the training CSV file. | |
- date (str): Date for which sales prediction is needed (in 'YYYY-MM-DD' format). | |
""" | |
try: | |
# Split the string by comma and convert it into a list | |
column_list = column_list.split(",") | |
text = self.audio_to_text(audio_path) | |
input_data = self.parse_text(text, column_list) | |
#load data | |
train_data = self.load_data(train_csv_path) | |
#preprocess the train data | |
train_df = self.preprocess_data(train_data, column_list, target_column) | |
# Train model and get validation predictions | |
trained_model, y_val, y_val_pred, validation = self.train_model(train_df, target_column, column_list) | |
# Plot interactive evaluation for training | |
line_graph = self.plot_line_graph(y_val, y_val_pred) | |
scatter_plot = self.plot_scatter_plot(y_val, y_val_pred) | |
# Predict sales for the specified date using the trained model | |
predicted_value = self.predict_sales_for_date(input_data, trained_model) | |
return validation, line_graph, scatter_plot, predicted_value | |
except Exception as e: | |
# Log an error message if an exception occurs in the main function | |
LOGGER.error(f"Error in main function: {e}") | |
def gradio_interface(self): | |
with gr.Blocks(css="style.css", theme="freddyaboulton/test-blue") as demo: | |
gr.HTML("""<center><h1 style="color:#fff">Demand Forecasting</h1></center>""") | |
with gr.Row(): | |
with gr.Column(scale=0.50): | |
train_csv = gr.File(elem_classes="uploadbutton") | |
with gr.Column(scale=0.50): | |
column_list = gr.Textbox(label="Column List") | |
with gr.Row(): | |
with gr.Column(scale=0.50): | |
audio_path = gr.Audio(sources=["microphone"], type="filepath") | |
with gr.Row(): | |
with gr.Column(scale=0.50): | |
selected_column = gr.Textbox(label="Select column") | |
with gr.Column(scale=0.50): | |
target_column = gr.Textbox(label="target column") | |
with gr.Row(): | |
validation = gr.Textbox(label="Validation") | |
with gr.Row(): | |
predicted_result = gr.Textbox(label="Predicted Result") | |
with gr.Row(): | |
line_plot = gr.Plot() | |
with gr.Row(): | |
scatter_plot = gr.Plot() | |
train_csv.upload(self.get_column, train_csv, column_list) | |
audio_path.stop_recording(self.main, [train_csv, audio_path, target_column, selected_column], [validation, line_plot, scatter_plot, predicted_result]) | |
demo.launch(debug=True) | |
if __name__ == "__main__": | |
demand = DemandForecasting() | |
demand.gradio_interface() |