import datetime as dt import json import os import urllib import zipfile from datetime import * from pathlib import Path from typing import List import pandas as pd import requests from meta.config import BINANCE_BASE_URL from meta.config import TIME_ZONE_BERLIN from meta.config import TIME_ZONE_JAKARTA from meta.config import TIME_ZONE_PARIS from meta.config import TIME_ZONE_SELFDEFINED from meta.config import TIME_ZONE_SHANGHAI from meta.config import TIME_ZONE_USEASTERN from meta.config import USE_TIME_ZONE_SELFDEFINED from meta.data_processors._base import _Base from meta.data_processors._base import check_date # from _base import check_date class Binance(_Base): def __init__( self, data_source: str, start_date: str, end_date: str, time_interval: str, **kwargs, ): if time_interval == "1D": raise ValueError("Please use the time_interval 1d instead of 1D") if time_interval == "1d": check_date(start_date) check_date(end_date) super().__init__(data_source, start_date, end_date, time_interval, **kwargs) self.url = "https://api.binance.com/api/v3/klines" self.time_diff = None # main functions def download_data( self, ticker_list: List[str], save_path: str = "./data/dataset.csv" ): startTime = dt.datetime.strptime(self.start_date, "%Y-%m-%d") endTime = dt.datetime.strptime(self.end_date, "%Y-%m-%d") self.start_time = self.stringify_dates(startTime) self.end_time = self.stringify_dates(endTime) self.interval = self.time_interval self.limit = 1440 # 1s for now, will add support for variable time and variable tick soon if self.time_interval == "1s": # as per https://binance-docs.github.io/apidocs/spot/en/#compressed-aggregate-trades-list self.limit = 1000 final_df = self.fetch_n_combine(self.start_date, self.end_date, ticker_list) else: final_df = pd.DataFrame() for i in ticker_list: hist_data = self.dataframe_with_limit(symbol=i) df = hist_data.iloc[:-1].dropna() df["tic"] = i final_df = pd.concat([final_df, df], axis=0, join="outer") self.dataframe = final_df self.save_data(save_path) print( f"Download complete! Dataset saved to {save_path}. \nShape of DataFrame: {self.dataframe.shape}" ) # def clean_data(self, df): # df = df.dropna() # return df # def add_technical_indicator(self, df, tech_indicator_list): # print('Adding self-defined technical indicators is NOT supported yet.') # print('Use default: MACD, RSI, CCI, DX.') # self.tech_indicator_list = ['open', 'high', 'low', 'close', 'volume', # 'macd', 'macd_signal', 'macd_hist', # 'rsi', 'cci', 'dx'] # final_df = pd.DataFrame() # for i in df.tic.unique(): # tic_df = df[df.tic==i] # tic_df['macd'], tic_df['macd_signal'], tic_df['macd_hist'] = MACD(tic_df['close'], fastperiod=12, # slowperiod=26, signalperiod=9) # tic_df['rsi'] = RSI(tic_df['close'], timeperiod=14) # tic_df['cci'] = CCI(tic_df['high'], tic_df['low'], tic_df['close'], timeperiod=14) # tic_df['dx'] = DX(tic_df['high'], tic_df['low'], tic_df['close'], timeperiod=14) # final_df = final_df.append(tic_df) # # return final_df # def add_turbulence(self, df): # print('Turbulence not supported yet. Return original DataFrame.') # # return df # def add_vix(self, df): # print('VIX is not applicable for cryptocurrencies. Return original DataFrame') # # return df # def df_to_array(self, df, tech_indicator_list, if_vix): # unique_ticker = df.tic.unique() # price_array = np.column_stack([df[df.tic==tic].close for tic in unique_ticker]) # tech_array = np.hstack([df.loc[(df.tic==tic), tech_indicator_list] for tic in unique_ticker]) # assert price_array.shape[0] == tech_array.shape[0] # return price_array, tech_array, np.array([]) # helper functions def stringify_dates(self, date: dt.datetime): return str(int(date.timestamp() * 1000)) def get_binance_bars(self, last_datetime, symbol): """ klines api returns data in the following order: open_time, open_price, high_price, low_price, close_price, volume, close_time, quote_asset_volume, n_trades, taker_buy_base_asset_volume, taker_buy_quote_asset_volume, ignore """ req_params = { "symbol": symbol, "interval": self.interval, "startTime": last_datetime, "endTime": self.end_time, "limit": self.limit, } # For debugging purposes, uncomment these lines and if they throw an error # then you may have an error in req_params # r = requests.get(self.url, params=req_params) # print(r.text) df = pd.DataFrame(requests.get(self.url, params=req_params).json()) if df.empty: return None df = df.iloc[:, 0:6] df.columns = ["datetime", "open", "high", "low", "close", "volume"] df[["open", "high", "low", "close", "volume"]] = df[ ["open", "high", "low", "close", "volume"] ].astype(float) # No stock split and dividend announcement, hence adjusted close is the same as close df["adjusted_close"] = df["close"] df["datetime"] = df.datetime.apply( lambda x: dt.datetime.fromtimestamp(x / 1000.0) ) df.reset_index(drop=True, inplace=True) return df def get_newest_bars(self, symbols, interval, limit): merged_df = pd.DataFrame() for symbol in symbols: req_params = { "symbol": symbol, "interval": interval, "limit": limit, } df = pd.DataFrame( requests.get(self.url, params=req_params).json(), index=range(limit), ) if df.empty: return None df = df.iloc[:, 0:6] df.columns = ["datetime", "open", "high", "low", "close", "volume"] df[["open", "high", "low", "close", "volume"]] = df[ ["open", "high", "low", "close", "volume"] ].astype(float) # No stock split and dividend announcement, hence adjusted close is the same as close df["adjusted_close"] = df["close"] df["datetime"] = df.datetime.apply( lambda x: dt.datetime.fromtimestamp(x / 1000.0) ) df["tic"] = symbol df = df.rename(columns={"datetime": "time"}) df.reset_index(drop=True, inplace=True) merged_df = merged_df.append(df) return merged_df def dataframe_with_limit(self, symbol): final_df = pd.DataFrame() last_datetime = self.start_time while True: new_df = self.get_binance_bars(last_datetime, symbol) if new_df is None: break if last_datetime == self.end_time: break final_df = pd.concat([final_df, new_df], axis=0, join="outer") # last_datetime = max(new_df.datetime) + dt.timedelta(days=1) last_datetime = max(new_df.datetime) if isinstance(last_datetime, pd.Timestamp): last_datetime = last_datetime.to_pydatetime() if self.time_diff == None: self.time_diff = new_df.loc[1]["datetime"] - new_df.loc[0]["datetime"] last_datetime = last_datetime + self.time_diff last_datetime = self.stringify_dates(last_datetime) date_value = final_df["datetime"].apply( lambda x: x.strftime("%Y-%m-%d %H:%M:%S") ) final_df.insert(0, "time", date_value) final_df.drop("datetime", inplace=True, axis=1) return final_df def get_download_url(self, file_url): return f"{BINANCE_BASE_URL}{file_url}" # downloads zip, unzips zip and deltes zip def download_n_unzip_file(self, base_path, file_name, date_range=None): download_path = f"{base_path}{file_name}" if date_range: date_range = date_range.replace(" ", "_") base_path = os.path.join(base_path, date_range) # raw_cache_dir = get_destination_dir("./cache/tick_raw") raw_cache_dir = "./cache/tick_raw" zip_save_path = os.path.join(raw_cache_dir, file_name) csv_name = os.path.splitext(file_name)[0] + ".csv" csv_save_path = os.path.join(raw_cache_dir, csv_name) fhandles = [] if os.path.exists(csv_save_path): print(f"\nfile already exists! {csv_save_path}") return [csv_save_path] # make the "cache" directory (only) if not os.path.exists(raw_cache_dir): Path(raw_cache_dir).mkdir(parents=True, exist_ok=True) try: download_url = self.get_download_url(download_path) dl_file = urllib.request.urlopen(download_url) length = dl_file.getheader("content-length") if length: length = int(length) blocksize = max(4096, length // 100) with open(zip_save_path, "wb") as out_file: dl_progress = 0 print(f"\nFile Download: {zip_save_path}") while True: buf = dl_file.read(blocksize) if not buf: break out_file.write(buf) # visuals # dl_progress += len(buf) # done = int(50 * dl_progress / length) # sys.stdout.write("\r[%s%s]" % ('#' * done, '.' * (50-done)) ) # sys.stdout.flush() # unzip and delete zip file = zipfile.ZipFile(zip_save_path) with zipfile.ZipFile(zip_save_path) as zip: # guaranteed just 1 csv csvpath = zip.extract(zip.namelist()[0], raw_cache_dir) fhandles.append(csvpath) os.remove(zip_save_path) return fhandles except urllib.error.HTTPError: print(f"\nFile not found: {download_url}") def convert_to_date_object(self, d): year, month, day = [int(x) for x in d.split("-")] return date(year, month, day) def get_path( self, trading_type, market_data_type, time_period, symbol, interval=None, ): trading_type_path = "data/spot" # currently just supporting spot if trading_type != "spot": trading_type_path = f"data/futures/{trading_type}" return ( f"{trading_type_path}/{time_period}/{market_data_type}/{symbol.upper()}/{interval}/" if interval is not None else f"{trading_type_path}/{time_period}/{market_data_type}/{symbol.upper()}/" ) # helpers for manipulating tick level data (1s intervals) def download_daily_aggTrades( self, symbols, num_symbols, dates, start_date, end_date ): trading_type = "spot" date_range = start_date + " " + end_date start_date = self.convert_to_date_object(start_date) end_date = self.convert_to_date_object(end_date) print(f"Found {num_symbols} symbols") map = {} for current, symbol in enumerate(symbols): map[symbol] = [] print( f"[{current + 1}/{num_symbols}] - start download daily {symbol} aggTrades " ) for date in dates: current_date = self.convert_to_date_object(date) if current_date >= start_date and current_date <= end_date: path = self.get_path(trading_type, "aggTrades", "daily", symbol) file_name = f"{symbol.upper()}-aggTrades-{date}.zip" fhandle = self.download_n_unzip_file(path, file_name, date_range) map[symbol] += fhandle return map def fetch_aggTrades(self, startDate: str, endDate: str, tickers: List[str]): # all valid symbols traded on v3 api response = urllib.request.urlopen( "https://api.binance.com/api/v3/exchangeInfo" ).read() valid_symbols = list( map( lambda symbol: symbol["symbol"], json.loads(response)["symbols"], ) ) for tic in tickers: if tic not in valid_symbols: print(tic + " not a valid ticker, removing from download") tickers = list(set(tickers) & set(valid_symbols)) num_symbols = len(tickers) # not adding tz yet # for ffill missing data on starting on first day 00:00:00 (if any) tminus1 = (self.convert_to_date_object(startDate) - dt.timedelta(1)).strftime( "%Y-%m-%d" ) dates = pd.date_range(start=tminus1, end=endDate) dates = [date.strftime("%Y-%m-%d") for date in dates] return self.download_daily_aggTrades( tickers, num_symbols, dates, tminus1, endDate ) # Dict[str]:List[str] -> pd.DataFrame def combine_raw(self, map): # same format as jingyang's current data format final_df = pd.DataFrame() # using AggTrades with headers from https://github.com/binance/binance-public-data/ colNames = [ "AggregatetradeId", "Price", "volume", "FirsttradeId", "LasttradeId", "time", "buyerWasMaker", "tradeWasBestPriceMatch", ] for tic in map.keys(): security = pd.DataFrame() for i, csv in enumerate(map[tic]): dailyticks = pd.read_csv( csv, names=colNames, index_col=["time"], parse_dates=["time"], date_parser=lambda epoch: pd.to_datetime(epoch, unit="ms"), ) dailyfinal = dailyticks.resample("1s").agg( {"Price": "ohlc", "volume": "sum"} ) dailyfinal.columns = dailyfinal.columns.droplevel(0) # favor continuous series # dailyfinal.dropna(inplace=True) # implemented T-1 day ffill day start missing values # guaranteed first csv is tminus1 day if i == 0: tmr = dailyfinal.index[0].date() + dt.timedelta(1) tmr_dt = dt.datetime.combine(tmr, dt.time.min) last_time_stamp_dt = dailyfinal.index[-1].to_pydatetime() s_delta = (tmr_dt - last_time_stamp_dt).seconds lastsample = dailyfinal.iloc[-1:] lastsample.index = lastsample.index.shift(s_delta, "s") else: day_dt = dailyfinal.index[0].date() day_str = day_dt.strftime("%Y-%m-%d") nextday_str = (day_dt + dt.timedelta(1)).strftime("%Y-%m-%d") if dailyfinal.index[0].second != 0: # append last sample dailyfinal = lastsample.append(dailyfinal) # otherwise, just reindex and ffill dailyfinal = dailyfinal.reindex( pd.date_range(day_str, nextday_str, freq="1s")[:-1], method="ffill", ) # save reference info (guaranteed to be :59) lastsample = dailyfinal.iloc[-1:] lastsample.index = lastsample.index.shift(1, "s") if dailyfinal.shape[0] != 86400: raise ValueError("everyday should have 86400 datapoints") # only save real startDate - endDate security = security.append(dailyfinal) security.ffill(inplace=True) security["tic"] = tic final_df = final_df.append(security) return final_df def fetch_n_combine(self, startDate, endDate, tickers): # return combine_raw(fetchAggTrades(startDate, endDate, tickers)) mapping = self.fetch_aggTrades(startDate, endDate, tickers) return self.combine_raw(mapping)