|
import data_source.crypto_compare as cc |
|
import matplotlib.pyplot as plt |
|
import pandas as pd |
|
import numpy as np |
|
from sklearn.preprocessing import MinMaxScaler |
|
from keras.models import Sequential, load_model |
|
from keras.layers import Dense |
|
from keras.layers import LSTM |
|
from keras.layers import Dropout |
|
import logging |
|
import os |
|
|
|
|
|
|
|
def import_tensorflow(): |
|
|
|
|
|
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' |
|
os.environ['CUDA_VISIBLE_DEVICES'] = '-1' |
|
|
|
import warnings |
|
|
|
warnings.simplefilter(action='ignore', category=FutureWarning) |
|
warnings.simplefilter(action='ignore', category=Warning) |
|
|
|
import tensorflow as tf |
|
tf.get_logger().setLevel('INFO') |
|
tf.autograph.set_verbosity(0) |
|
|
|
tf.get_logger().setLevel(logging.ERROR) |
|
|
|
return tf |
|
tf = import_tensorflow() |
|
|
|
class BtcLtsm: |
|
def __init__(self): |
|
self._data_source = cc.CryptoCompare() |
|
self._train_name_base = 'btc_price_train' |
|
self._test_name_base = 'btc_price_test' |
|
self._model_name_base = 'btc_ltsm' |
|
self._history = 60 |
|
self._layer_size = 50 |
|
self._dropout = 0.2 |
|
|
|
def update_dataset(self, percent_train=0.98, limit=2000): |
|
try: |
|
ohlcv_df = self._data_source.get_daily_history('BTC', 'USDT', limit=limit) |
|
|
|
test_start_idx = int(len(ohlcv_df) * percent_train) |
|
|
|
train_df = ohlcv_df[:test_start_idx] |
|
test_df = ohlcv_df[test_start_idx:] |
|
train_df.to_csv(os.path.join('datasets', f'{self._train_name_base}.csv'), index=False) |
|
test_df.to_csv(os.path.join('datasets', f'{self._test_name_base}.csv'), index=False) |
|
return True |
|
except Exception as e: |
|
|
|
print(f"An error occurred: {e}") |
|
return False |
|
|
|
def train(self): |
|
train_file_name = os.path.join('datasets', f'{self._train_name_base}.csv') |
|
data_train = pd.read_csv(train_file_name) |
|
train_set = data_train.iloc[:, 1:2].values |
|
|
|
sc = MinMaxScaler(feature_range=(0, 1)) |
|
train_set = sc.fit_transform(train_set) |
|
logging.debug(f'training set:\n{train_set}') |
|
|
|
|
|
history = 60 |
|
features_train = [] |
|
results_train = [] |
|
for i in range(history, len(train_set)): |
|
features_train.append(train_set[i-history:i, 0]) |
|
results_train.append(train_set[i, 0]) |
|
features_train, results_train = np.array(features_train), np.array(results_train) |
|
|
|
|
|
features_train = np.reshape(features_train, (features_train.shape[0], features_train.shape[1], 1)) |
|
|
|
model_path = os.path.join('predictors/saved', f'{self._model_name_base}.h5') |
|
self._create_rnn(model_path, features_train, results_train) |
|
|
|
def load(self): |
|
model_path = os.path.join('predictors/saved', f'{self._model_name_base}.h5') |
|
self._regressor = load_model(model_path) |
|
|
|
def test_model(self): |
|
train_file_name = os.path.join('datasets', f'{self._train_name_base}.csv') |
|
test_file_name = os.path.join('datasets', f'{self._test_name_base}.csv') |
|
|
|
|
|
dataset_test = pd.read_csv(test_file_name) |
|
real_stock_price = dataset_test.iloc[:, 1:2].values |
|
|
|
dataset_train = pd.read_csv(train_file_name) |
|
train_set = dataset_train.iloc[:, 1:2].values |
|
|
|
sc = MinMaxScaler(feature_range = (0, 1)) |
|
train_set = sc.fit_transform(train_set) |
|
|
|
|
|
dataset_total = pd.concat((dataset_train['open'], dataset_test['open']), axis = 0) |
|
inputs = dataset_total[len(dataset_total) - len(dataset_test) - 60:].values |
|
inputs = inputs.reshape(-1,1) |
|
inputs = sc.transform(inputs) |
|
features_test = [] |
|
for i in range(self._history, len(inputs)): |
|
features_test.append(inputs[i-self._history:i, 0]) |
|
features_test = np.array(features_test) |
|
features_test = np.reshape(features_test, (features_test.shape[0], features_test.shape[1], 1)) |
|
predicted_stock_price = self._regressor.predict(features_test) |
|
predicted_stock_price = sc.inverse_transform(predicted_stock_price) |
|
|
|
|
|
plt.plot(real_stock_price, color = 'red', label = 'Real Price') |
|
plt.plot(predicted_stock_price, color = 'blue', label = 'Predicted Price') |
|
plt.title('BTC Price Prediction') |
|
plt.xlabel('Time') |
|
plt.ylabel('BTC Price') |
|
plt.legend() |
|
plt.savefig('btc_price_prediction.png') |
|
|
|
def _create_rnn(self, model_name, features_train, results_train, epochs=100, batch_size=32): |
|
|
|
self._regressor = Sequential() |
|
|
|
|
|
self._regressor.add(LSTM(units = self._layer_size, return_sequences = True, input_shape = (features_train.shape[1], 1))) |
|
self._regressor.add(Dropout(self._dropout)) |
|
|
|
|
|
self._regressor.add(LSTM(units = self._layer_size, return_sequences = True)) |
|
self._regressor.add(Dropout(self._dropout)) |
|
|
|
|
|
self._regressor.add(LSTM(units = self._layer_size, return_sequences = True)) |
|
self._regressor.add(Dropout(self._dropout)) |
|
|
|
|
|
self._regressor.add(LSTM(units = self._layer_size)) |
|
self._regressor.add(Dropout(self._dropout)) |
|
|
|
|
|
self._regressor.add(Dense(units = 1)) |
|
|
|
|
|
self._regressor.compile(optimizer = 'adam', loss = 'mean_squared_error') |
|
|
|
|
|
self._regressor.fit(features_train, results_train, epochs = epochs, batch_size = batch_size) |
|
self._regressor.save(model_ethereum_mixtral) |