kristada673's picture
Upload 19 files
de6e775
import calendar
from datetime import datetime
from typing import List
import ccxt
import numpy as np
import pandas as pd
from meta.data_processors._base import _Base
# from basic_processor import _Base
class Ccxt(_Base):
def __init__(
self,
data_source: str,
start_date: str,
end_date: str,
time_interval: str,
**kwargs,
):
super().__init__(data_source, start_date, end_date, time_interval, **kwargs)
self.binance = ccxt.binance()
def download_data(
self, ticker_list: List[str], save_path: str = "./data/dataset.csv"
):
crypto_column = pd.MultiIndex.from_product(
[ticker_list, ["open", "high", "low", "close", "volume"]]
)
first_time = True
for ticker in ticker_list:
start_dt = datetime.strptime(self.start_date, "%Y%m%d %H:%M:%S")
end_dt = datetime.strptime(self.end_date, "%Y%m%d %H:%M:%S")
start_timestamp = calendar.timegm(start_dt.utctimetuple())
end_timestamp = calendar.timegm(end_dt.utctimetuple())
if self.time_interval == "1Min":
date_list = [
datetime.utcfromtimestamp(float(time))
for time in range(start_timestamp, end_timestamp, 60 * 720)
]
else:
date_list = [
datetime.utcfromtimestamp(float(time))
for time in range(start_timestamp, end_timestamp, 60 * 1440)
]
df = self.ohlcv(date_list, ticker, self.time_interval)
if first_time:
dataset = pd.DataFrame(columns=crypto_column, index=df["time"].values)
first_time = False
temp_col = pd.MultiIndex.from_product(
[[ticker], ["open", "high", "low", "close", "volume"]]
)
dataset[temp_col] = df[["open", "high", "low", "close", "volume"]].values
print("Actual end time: " + str(df["time"].values[-1]))
self.dataframe = dataset
self.save_data(save_path)
print(
f"Download complete! Dataset saved to {save_path}. \nShape of DataFrame: {self.dataframe.shape}"
)
# def add_technical_indicators(self, df, pair_list, tech_indicator_list = [
# 'macd', 'boll_ub', 'boll_lb', 'rsi_30', 'dx_30',
# 'close_30_sma', 'close_60_sma']):
# df = df.dropna()
# df = df.copy()
# column_list = [pair_list, ['open','high','low','close','volume']+(tech_indicator_list)]
# column = pd.MultiIndex.from_product(column_list)
# index_list = df.index
# dataset = pd.DataFrame(columns=column,index=index_list)
# for pair in pair_list:
# pair_column = pd.MultiIndex.from_product([[pair],['open','high','low','close','volume']])
# dataset[pair_column] = df[pair]
# temp_df = df[pair].reset_index().sort_values(by=['index'])
# temp_df = temp_df.rename(columns={'index':'date'})
# crypto_df = Sdf.retype(temp_df.copy())
# for indicator in tech_indicator_list:
# temp_indicator = crypto_df[indicator].values.tolist()
# dataset[(pair,indicator)] = temp_indicator
# print('Succesfully add technical indicators')
# return dataset
def df_to_ary(self, pair_list, tech_indicator_list=None):
if tech_indicator_list is None:
tech_indicator_list = [
"macd",
"boll_ub",
"boll_lb",
"rsi_30",
"dx_30",
"close_30_sma",
"close_60_sma",
]
df = self.dataframe
df = df.dropna()
date_ary = df.index.values
price_array = df[pd.MultiIndex.from_product([pair_list, ["close"]])].values
tech_array = df[
pd.MultiIndex.from_product([pair_list, tech_indicator_list])
].values
return price_array, tech_array, date_ary
def min_ohlcv(self, dt, pair, limit):
since = calendar.timegm(dt.utctimetuple()) * 1000
return self.binance.fetch_ohlcv(
symbol=pair, timeframe="1m", since=since, limit=limit
)
def ohlcv(self, dt, pair, period="1d"):
ohlcv = []
limit = 1000
if period == "1Min":
limit = 720
elif period == "1D":
limit = 1
elif period == "1H":
limit = 24
elif period == "5Min":
limit = 288
for i in dt:
start_dt = i
since = calendar.timegm(start_dt.utctimetuple()) * 1000
if period == "1Min":
ohlcv.extend(self.min_ohlcv(start_dt, pair, limit))
else:
ohlcv.extend(
self.binance.fetch_ohlcv(
symbol=pair, timeframe=period, since=since, limit=limit
)
)
df = pd.DataFrame(
ohlcv, columns=["time", "open", "high", "low", "close", "volume"]
)
df["time"] = [datetime.fromtimestamp(float(time) / 1000) for time in df["time"]]
df["open"] = df["open"].astype(np.float64)
df["high"] = df["high"].astype(np.float64)
df["low"] = df["low"].astype(np.float64)
df["close"] = df["close"].astype(np.float64)
df["volume"] = df["volume"].astype(np.float64)
return df