Spaces:
Sleeping
Sleeping
import pandas as pd | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from sklearn.preprocessing import MinMaxScaler | |
from sklearn.metrics import mean_squared_error, r2_score | |
from tensorflow.keras.models import load_model | |
import streamlit as st | |
# Step 1: Load the datasets | |
def load_data(): | |
options_path = 'BANKNIFTY_Option.csv' | |
futures_path = 'BANKNIFTY_Future.csv' | |
options_data = pd.read_csv(options_path) | |
futures_data = pd.read_csv(futures_path) | |
return options_data, futures_data | |
# Step 2: Preprocessing and Merging | |
def preprocess_data(options_data, futures_data): | |
options_data['lasttradetime'] = pd.to_datetime(options_data['lasttradetime'], unit='s') | |
futures_data['lasttradetime'] = pd.to_datetime(futures_data['lasttradetime'], unit='s') | |
# Merge datasets on lasttradetime | |
merged_data = pd.merge(options_data, futures_data, on='lasttradetime', how='inner', suffixes=('_options', '_futures')) | |
# Create new features | |
merged_data['price_diff'] = merged_data['close_options'] - merged_data['close_futures'] | |
merged_data['volume_ratio'] = merged_data['tradedqty'] / merged_data['volume'] | |
merged_data['openinterest_diff'] = merged_data['openinterest_options'] - merged_data['openinterest_futures'] | |
# Drop unnecessary columns | |
merged_data = merged_data[['lasttradetime', 'close_options', 'price_diff', 'volume_ratio', 'openinterest_diff']] | |
merged_data = merged_data.set_index('lasttradetime') | |
return merged_data | |
# Step 3: Load Pre-Trained Model and Scaler | |
def load_trained_model_and_scaler(): | |
model = load_model('banknifty_model.h5') | |
scaler = np.load('scaler.npy', allow_pickle=True).item() | |
return model, scaler | |
# Step 4: Predict Future Prices | |
def predict_future_prices(model, scaler, X, last_date, steps=5): | |
last_sequence = X[-1:, :, :] | |
future_forecast = [] | |
future_dates = pd.date_range(start=last_date, periods=steps + 1, freq='5min')[1:] | |
for _ in range(steps): | |
next_pred = model.predict(last_sequence)[0, 0] | |
future_forecast.append(next_pred) | |
next_input = np.concatenate([last_sequence[:, 1:, :], np.array([[[next_pred, 0, 0, 0]]])], axis=1) | |
last_sequence = next_input | |
future_forecast_rescaled = scaler.inverse_transform( | |
np.hstack((np.array(future_forecast).reshape(-1, 1), np.zeros((steps, X.shape[2] - 1)))) | |
)[:, 0] | |
return future_forecast_rescaled, future_dates | |
# Step 5: Evaluate Model | |
def evaluate_model(model, X_test, y_test, scaler): | |
y_pred = model.predict(X_test) | |
y_pred_rescaled = scaler.inverse_transform( | |
np.hstack((y_pred, np.zeros((y_pred.shape[0], X_test.shape[2] - 1)))) | |
)[:, 0] | |
y_test_rescaled = scaler.inverse_transform( | |
np.hstack((y_test.reshape(-1, 1), np.zeros((y_test.shape[0], X_test.shape[2] - 1)))) | |
)[:, 0] | |
mse = mean_squared_error(y_test_rescaled, y_pred_rescaled) | |
r2 = r2_score(y_test_rescaled, y_pred_rescaled) | |
# Calculate accuracy | |
accuracy = 1 - (np.abs(y_test_rescaled - y_pred_rescaled).mean() / y_test_rescaled.mean()) | |
return mse, r2, accuracy | |
# Streamlit App | |
def main(): | |
st.title("Bank Nifty Options & Futures Forecasting") | |
# Load and preprocess data | |
options_data, futures_data = load_data() | |
merged_data = preprocess_data(options_data, futures_data) | |
st.write("### Merged Dataset") | |
st.dataframe(merged_data.head(10)) | |
# Feature Scaling | |
model, scaler = load_trained_model_and_scaler() | |
scaled_data = scaler.transform(merged_data) | |
# Create Sequences | |
time_steps = 72 | |
X = np.array([scaled_data[i:i + time_steps] for i in range(len(scaled_data) - time_steps)]) | |
y = scaled_data[time_steps:, 0] | |
# Evaluate Model | |
mse, r2, accuracy = evaluate_model(model, X, y, scaler) | |
st.write(f"### Model Evaluation") | |
st.write(f"Mean Squared Error (MSE): {mse:.2f}") | |
st.write(f"R² Score: {r2:.2f}") | |
st.write(f"Accuracy: {accuracy * 100:.2f}%") | |
# Predict Future Prices | |
st.write("### Predicting Future Prices...") | |
last_date = merged_data.index[-1] | |
steps = st.slider("Select Number of Future Steps to Predict", min_value=1, max_value=20, value=5) | |
future_prices, future_dates = predict_future_prices(model, scaler, X, last_date, steps) | |
st.write("### Predicted Future Prices") | |
for date, price in zip(future_dates, future_prices): | |
st.write(f"{date}: {price:.2f}") | |
# Plot Results | |
st.write("### Predicted Prices Plot") | |
fig, ax = plt.subplots(figsize=(12, 6)) | |
ax.plot(future_dates, future_prices, marker='o', label="Predicted Prices") | |
ax.set_title("Future Predicted Prices") | |
ax.set_xlabel("Date") | |
ax.set_ylabel("Price") | |
ax.legend() | |
st.pyplot(fig) | |
if __name__ == "__main__": | |
main() | |