import calendar from datetime import datetime from typing import List import ccxt import numpy as np import pandas as pd from meta.data_processors._base import _Base # from basic_processor import _Base class Ccxt(_Base): def __init__( self, data_source: str, start_date: str, end_date: str, time_interval: str, **kwargs, ): super().__init__(data_source, start_date, end_date, time_interval, **kwargs) self.binance = ccxt.binance() def download_data( self, ticker_list: List[str], save_path: str = "./data/dataset.csv" ): crypto_column = pd.MultiIndex.from_product( [ticker_list, ["open", "high", "low", "close", "volume"]] ) first_time = True for ticker in ticker_list: start_dt = datetime.strptime(self.start_date, "%Y%m%d %H:%M:%S") end_dt = datetime.strptime(self.end_date, "%Y%m%d %H:%M:%S") start_timestamp = calendar.timegm(start_dt.utctimetuple()) end_timestamp = calendar.timegm(end_dt.utctimetuple()) if self.time_interval == "1Min": date_list = [ datetime.utcfromtimestamp(float(time)) for time in range(start_timestamp, end_timestamp, 60 * 720) ] else: date_list = [ datetime.utcfromtimestamp(float(time)) for time in range(start_timestamp, end_timestamp, 60 * 1440) ] df = self.ohlcv(date_list, ticker, self.time_interval) if first_time: dataset = pd.DataFrame(columns=crypto_column, index=df["time"].values) first_time = False temp_col = pd.MultiIndex.from_product( [[ticker], ["open", "high", "low", "close", "volume"]] ) dataset[temp_col] = df[["open", "high", "low", "close", "volume"]].values print("Actual end time: " + str(df["time"].values[-1])) self.dataframe = dataset self.save_data(save_path) print( f"Download complete! Dataset saved to {save_path}. \nShape of DataFrame: {self.dataframe.shape}" ) # def add_technical_indicators(self, df, pair_list, tech_indicator_list = [ # 'macd', 'boll_ub', 'boll_lb', 'rsi_30', 'dx_30', # 'close_30_sma', 'close_60_sma']): # df = df.dropna() # df = df.copy() # column_list = [pair_list, ['open','high','low','close','volume']+(tech_indicator_list)] # column = pd.MultiIndex.from_product(column_list) # index_list = df.index # dataset = pd.DataFrame(columns=column,index=index_list) # for pair in pair_list: # pair_column = pd.MultiIndex.from_product([[pair],['open','high','low','close','volume']]) # dataset[pair_column] = df[pair] # temp_df = df[pair].reset_index().sort_values(by=['index']) # temp_df = temp_df.rename(columns={'index':'date'}) # crypto_df = Sdf.retype(temp_df.copy()) # for indicator in tech_indicator_list: # temp_indicator = crypto_df[indicator].values.tolist() # dataset[(pair,indicator)] = temp_indicator # print('Succesfully add technical indicators') # return dataset def df_to_ary(self, pair_list, tech_indicator_list=None): if tech_indicator_list is None: tech_indicator_list = [ "macd", "boll_ub", "boll_lb", "rsi_30", "dx_30", "close_30_sma", "close_60_sma", ] df = self.dataframe df = df.dropna() date_ary = df.index.values price_array = df[pd.MultiIndex.from_product([pair_list, ["close"]])].values tech_array = df[ pd.MultiIndex.from_product([pair_list, tech_indicator_list]) ].values return price_array, tech_array, date_ary def min_ohlcv(self, dt, pair, limit): since = calendar.timegm(dt.utctimetuple()) * 1000 return self.binance.fetch_ohlcv( symbol=pair, timeframe="1m", since=since, limit=limit ) def ohlcv(self, dt, pair, period="1d"): ohlcv = [] limit = 1000 if period == "1Min": limit = 720 elif period == "1D": limit = 1 elif period == "1H": limit = 24 elif period == "5Min": limit = 288 for i in dt: start_dt = i since = calendar.timegm(start_dt.utctimetuple()) * 1000 if period == "1Min": ohlcv.extend(self.min_ohlcv(start_dt, pair, limit)) else: ohlcv.extend( self.binance.fetch_ohlcv( symbol=pair, timeframe=period, since=since, limit=limit ) ) df = pd.DataFrame( ohlcv, columns=["time", "open", "high", "low", "close", "volume"] ) df["time"] = [datetime.fromtimestamp(float(time) / 1000) for time in df["time"]] df["open"] = df["open"].astype(np.float64) df["high"] = df["high"].astype(np.float64) df["low"] = df["low"].astype(np.float64) df["close"] = df["close"].astype(np.float64) df["volume"] = df["volume"].astype(np.float64) return df