Spaces:
Sleeping
Sleeping
from flask import Flask, render_template, request, jsonify | |
import pandas as pd | |
import numpy as np | |
import pickle | |
import os | |
from datetime import datetime, timedelta | |
import matplotlib | |
matplotlib.use('Agg') # Use non-interactive backend | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
import base64 | |
import io | |
app = Flask(__name__) | |
# Load model and scaler | |
def load_model_and_scaler(): | |
try: | |
with open('model.pkl', 'rb') as model_file: | |
model = pickle.load(model_file) | |
with open('scaler.pkl', 'rb') as scaler_file: | |
scaler = pickle.load(scaler_file) | |
return model, scaler | |
except FileNotFoundError: | |
return None, None | |
def load_data(): | |
"""Load and preprocess the gold data""" | |
try: | |
data = pd.read_csv('gold.csv') | |
data['date'] = pd.to_datetime(data['date'], format='%d/%m/%Y').dt.strftime('%Y-%m-%d') | |
temp_df = data[['date', 'close', 'open']] | |
# Preprocessing | |
df = temp_df.copy() | |
df['date'] = pd.to_datetime(df['date'], dayfirst=True, format='%Y-%m-%d').dt.date | |
df.set_index('date', inplace=True) | |
df.index = pd.to_datetime(df.index) | |
df = df.sort_index() | |
return df | |
except FileNotFoundError: | |
return None | |
def normalize_data(df, scaler): | |
"""Normalize the data using the provided scaler""" | |
np_data_unscaled = np.array(df) | |
np_data_scaled = scaler.transform(np_data_unscaled) | |
normalized_df = pd.DataFrame(np_data_scaled, columns=df.columns, index=df.index) | |
return normalized_df | |
def sliding_window(data, lag): | |
"""Create sliding window features""" | |
series_close = data['close'] | |
series_open = data['open'] | |
result = pd.DataFrame() | |
# Add lag columns for 'close' | |
for l in lag: | |
result[f'close-{l}'] = series_close.shift(l) | |
# Add lag columns for 'open' | |
for l in lag: | |
result[f'open-{l}'] = series_open.shift(l) | |
# Add original 'close' and 'open' columns | |
result['close'] = series_close[max(lag):] | |
result['open'] = series_open[max(lag):] | |
# Remove missing values (NaN) | |
result = result.dropna() | |
# Set index according to lag values | |
result.index = series_close.index[max(lag):] | |
return result | |
def predict_next_7_days(model, scaler, data): | |
"""Predict gold prices for the next 7 days""" | |
# Normalize data | |
normalized_df = normalize_data(data, scaler) | |
# Create sliding window | |
windowed_data = sliding_window(normalized_df, [1, 2, 3, 4, 5, 6, 7]) | |
windowed_data = windowed_data[['close', 'close-1', 'close-2', 'close-3', 'close-4', 'close-5', 'close-6', 'close-7', | |
'open', 'open-1', 'open-2', 'open-3', 'open-4', 'open-5', 'open-6', 'open-7']] | |
# Initialize predictions list | |
predictions = [] | |
# Get last row as initial input | |
last_row = windowed_data.drop(columns=['close', 'open']).iloc[-1].values.reshape(1, -1) | |
# Iterate for 7 days | |
for _ in range(7): | |
# Predict value for next day | |
predicted_value_normalized = model.predict(last_row) | |
predicted_value = scaler.inverse_transform(predicted_value_normalized.reshape(-1, 2)) | |
# Save prediction | |
predictions.append(predicted_value[0]) | |
# Update input for next iteration | |
new_row_normalized = np.hstack([last_row[0, 2:], predicted_value_normalized[0]]) | |
last_row = new_row_normalized.reshape(1, -1) | |
# Transform predictions to DataFrame | |
predictions_df = pd.DataFrame( | |
predictions, | |
columns=['close', 'open'], | |
index=pd.date_range(start=normalized_df.index[-1] + pd.Timedelta(days=1), periods=7) | |
) | |
# Get last price | |
last_price = scaler.inverse_transform(normalized_df[['close', 'open']].iloc[-1].values.reshape(-1, 2)) | |
# Calculate daily percentage changes | |
predictions_df['close_change'] = predictions_df['close'].pct_change().fillna(0) * 100 | |
predictions_df['open_change'] = predictions_df['open'].pct_change().fillna(0) * 100 | |
# Calculate total change from today to day 7 | |
total_close_change = ((predictions_df['close'].iloc[-1] - last_price[0][0]) / last_price[0][0]) * 100 | |
total_open_change = ((predictions_df['open'].iloc[-1] - last_price[0][1]) / last_price[0][1]) * 100 | |
return predictions_df, last_price[0], total_close_change, total_open_change | |
def create_prediction_chart(data, predictions_df, last_price): | |
"""Create a chart showing historical and predicted prices""" | |
plt.style.use('default') | |
fig, ax = plt.subplots(figsize=(12, 8)) | |
# Plot last 30 days of historical data | |
recent_data = data.tail(30) | |
ax.plot(recent_data.index, recent_data['close'], 'b-', label='Historical Close Price', linewidth=2) | |
ax.plot(recent_data.index, recent_data['open'], 'g-', label='Historical Open Price', linewidth=2) | |
# Add current day point | |
ax.plot(recent_data.index[-1], last_price[0], 'ro', markersize=8, label='Current Close Price') | |
ax.plot(recent_data.index[-1], last_price[1], 'go', markersize=8, label='Current Open Price') | |
# Plot predictions | |
ax.plot(predictions_df.index, predictions_df['close'], 'r--', label='Predicted Close Price', linewidth=2, marker='o') | |
ax.plot(predictions_df.index, predictions_df['open'], 'orange', linestyle='--', label='Predicted Open Price', linewidth=2, marker='s') | |
ax.set_title('Gold Price Prediction - Next 7 Days', fontsize=16, fontweight='bold') | |
ax.set_xlabel('Date', fontsize=12) | |
ax.set_ylabel('Price (IDR)', fontsize=12) | |
ax.legend() | |
ax.grid(True, alpha=0.3) | |
# Format y-axis to show prices in millions | |
ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'{x/1000000:.1f}M')) | |
plt.xticks(rotation=45) | |
plt.tight_layout() | |
# Convert plot to base64 string | |
img_buffer = io.BytesIO() | |
plt.savefig(img_buffer, format='png', dpi=150, bbox_inches='tight') | |
img_buffer.seek(0) | |
img_string = base64.b64encode(img_buffer.read()).decode() | |
plt.close() | |
return img_string | |
def index(): | |
return render_template('index.html') | |
def predict(): | |
try: | |
# Load model and scaler | |
model, scaler = load_model_and_scaler() | |
if model is None or scaler is None: | |
return jsonify({'error': 'Model or scaler not found. Please train the model first.'}), 500 | |
# Load data | |
data = load_data() | |
if data is None: | |
return jsonify({'error': 'Data file not found.'}), 500 | |
# Make predictions | |
predictions_df, last_price, total_close_change, total_open_change = predict_next_7_days(model, scaler, data) | |
# Create chart | |
chart_img = create_prediction_chart(data, predictions_df, last_price) | |
# Prepare response data | |
predictions_list = [] | |
for date, row in predictions_df.iterrows(): | |
predictions_list.append({ | |
'date': date.strftime('%Y-%m-%d'), | |
'close_price': round(row['close'], 2), | |
'open_price': round(row['open'], 2), | |
'close_change': round(row['close_change'], 2), | |
'open_change': round(row['open_change'], 2) | |
}) | |
response = { | |
'success': True, | |
'current_prices': { | |
'close': round(last_price[0], 2), | |
'open': round(last_price[1], 2) | |
}, | |
'predictions': predictions_list, | |
'total_changes': { | |
'close': round(total_close_change, 2), | |
'open': round(total_open_change, 2) | |
}, | |
'chart': chart_img | |
} | |
return jsonify(response) | |
except Exception as e: | |
return jsonify({'error': f'An error occurred: {str(e)}'}), 500 | |
def data_analysis(): | |
"""Show data analysis page""" | |
try: | |
data = load_data() | |
if data is None: | |
return render_template('error.html', error='Data file not found.') | |
# Create historical price chart | |
fig, ax = plt.subplots(figsize=(12, 6)) | |
ax.plot(data.index, data['close'], 'b-', label='Close Price', linewidth=1.5) | |
ax.plot(data.index, data['open'], 'g-', label='Open Price', linewidth=1.5) | |
ax.set_title('Historical Gold Prices', fontsize=16, fontweight='bold') | |
ax.set_xlabel('Date', fontsize=12) | |
ax.set_ylabel('Price (IDR)', fontsize=12) | |
ax.legend() | |
ax.grid(True, alpha=0.3) | |
ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'{x/1000000:.1f}M')) | |
plt.xticks(rotation=45) | |
plt.tight_layout() | |
img_buffer = io.BytesIO() | |
plt.savefig(img_buffer, format='png', dpi=150, bbox_inches='tight') | |
img_buffer.seek(0) | |
historical_chart = base64.b64encode(img_buffer.read()).decode() | |
plt.close() | |
# Calculate statistics | |
stats = { | |
'total_records': len(data), | |
'date_range': f"{data.index.min().strftime('%Y-%m-%d')} to {data.index.max().strftime('%Y-%m-%d')}", | |
'avg_close': round(data['close'].mean(), 2), | |
'avg_open': round(data['open'].mean(), 2), | |
'min_close': round(data['close'].min(), 2), | |
'max_close': round(data['close'].max(), 2), | |
'current_close': round(data['close'].iloc[-1], 2), | |
'current_open': round(data['open'].iloc[-1], 2) | |
} | |
return render_template('data_analysis.html', chart=historical_chart, stats=stats) | |
except Exception as e: | |
return render_template('error.html', error=f'An error occurred: {str(e)}') | |
if __name__ == '__main__': | |
# For Hugging Face Spaces, use port 7860 | |
port = int(os.environ.get('PORT', 7860)) | |
app.run(host='0.0.0.0', port=port, debug=False) |