File size: 16,866 Bytes
de6e775
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
import datetime as dt
import json
import os
import urllib
import zipfile
from datetime import *
from pathlib import Path
from typing import List

import pandas as pd
import requests

from meta.config import BINANCE_BASE_URL
from meta.config import TIME_ZONE_BERLIN
from meta.config import TIME_ZONE_JAKARTA
from meta.config import TIME_ZONE_PARIS
from meta.config import TIME_ZONE_SELFDEFINED
from meta.config import TIME_ZONE_SHANGHAI
from meta.config import TIME_ZONE_USEASTERN
from meta.config import USE_TIME_ZONE_SELFDEFINED
from meta.data_processors._base import _Base
from meta.data_processors._base import check_date

# from _base import check_date


class Binance(_Base):
    def __init__(
        self,
        data_source: str,
        start_date: str,
        end_date: str,
        time_interval: str,
        **kwargs,
    ):
        if time_interval == "1D":
            raise ValueError("Please use the time_interval 1d instead of 1D")
        if time_interval == "1d":
            check_date(start_date)
            check_date(end_date)
        super().__init__(data_source, start_date, end_date, time_interval, **kwargs)
        self.url = "https://api.binance.com/api/v3/klines"
        self.time_diff = None

    # main functions
    def download_data(
        self, ticker_list: List[str], save_path: str = "./data/dataset.csv"
    ):
        startTime = dt.datetime.strptime(self.start_date, "%Y-%m-%d")
        endTime = dt.datetime.strptime(self.end_date, "%Y-%m-%d")

        self.start_time = self.stringify_dates(startTime)
        self.end_time = self.stringify_dates(endTime)
        self.interval = self.time_interval
        self.limit = 1440

        # 1s for now, will add support for variable time and variable tick soon
        if self.time_interval == "1s":
            # as per https://binance-docs.github.io/apidocs/spot/en/#compressed-aggregate-trades-list
            self.limit = 1000
            final_df = self.fetch_n_combine(self.start_date, self.end_date, ticker_list)
        else:
            final_df = pd.DataFrame()
            for i in ticker_list:
                hist_data = self.dataframe_with_limit(symbol=i)
                df = hist_data.iloc[:-1].dropna()
                df["tic"] = i
                final_df = pd.concat([final_df, df], axis=0, join="outer")
        self.dataframe = final_df

        self.save_data(save_path)

        print(
            f"Download complete! Dataset saved to {save_path}. \nShape of DataFrame: {self.dataframe.shape}"
        )

    # def clean_data(self, df):
    #     df = df.dropna()
    #     return df

    # def add_technical_indicator(self, df, tech_indicator_list):
    #     print('Adding self-defined technical indicators is NOT supported yet.')
    #     print('Use default: MACD, RSI, CCI, DX.')
    #     self.tech_indicator_list = ['open', 'high', 'low', 'close', 'volume',
    #                                 'macd', 'macd_signal', 'macd_hist',
    #                                 'rsi', 'cci', 'dx']
    #     final_df = pd.DataFrame()
    #     for i in df.tic.unique():
    #         tic_df = df[df.tic==i]
    #         tic_df['macd'], tic_df['macd_signal'], tic_df['macd_hist'] = MACD(tic_df['close'], fastperiod=12,
    #                                                                             slowperiod=26, signalperiod=9)
    #         tic_df['rsi'] = RSI(tic_df['close'], timeperiod=14)
    #         tic_df['cci'] = CCI(tic_df['high'], tic_df['low'], tic_df['close'], timeperiod=14)
    #         tic_df['dx'] = DX(tic_df['high'], tic_df['low'], tic_df['close'], timeperiod=14)
    #         final_df = final_df.append(tic_df)
    #
    #     return final_df

    # def add_turbulence(self, df):
    #     print('Turbulence not supported yet. Return original DataFrame.')
    #
    #     return df

    # def add_vix(self, df):
    #     print('VIX is not applicable for cryptocurrencies. Return original DataFrame')
    #
    #     return df

    # def df_to_array(self, df, tech_indicator_list, if_vix):
    #     unique_ticker = df.tic.unique()
    #     price_array = np.column_stack([df[df.tic==tic].close for tic in unique_ticker])
    #     tech_array = np.hstack([df.loc[(df.tic==tic), tech_indicator_list] for tic in unique_ticker])
    #     assert price_array.shape[0] == tech_array.shape[0]
    #     return price_array, tech_array, np.array([])

    # helper functions
    def stringify_dates(self, date: dt.datetime):
        return str(int(date.timestamp() * 1000))

    def get_binance_bars(self, last_datetime, symbol):
        """
        klines api returns data in the following order:
        open_time, open_price, high_price, low_price, close_price,
        volume, close_time, quote_asset_volume, n_trades,
        taker_buy_base_asset_volume, taker_buy_quote_asset_volume,
        ignore
        """
        req_params = {
            "symbol": symbol,
            "interval": self.interval,
            "startTime": last_datetime,
            "endTime": self.end_time,
            "limit": self.limit,
        }
        # For debugging purposes, uncomment these lines and if they throw an error
        # then you may have an error in req_params
        # r = requests.get(self.url, params=req_params)
        # print(r.text)
        df = pd.DataFrame(requests.get(self.url, params=req_params).json())

        if df.empty:
            return None

        df = df.iloc[:, 0:6]
        df.columns = ["datetime", "open", "high", "low", "close", "volume"]

        df[["open", "high", "low", "close", "volume"]] = df[
            ["open", "high", "low", "close", "volume"]
        ].astype(float)

        # No stock split and dividend announcement, hence adjusted close is the same as close
        df["adjusted_close"] = df["close"]
        df["datetime"] = df.datetime.apply(
            lambda x: dt.datetime.fromtimestamp(x / 1000.0)
        )
        df.reset_index(drop=True, inplace=True)

        return df

    def get_newest_bars(self, symbols, interval, limit):
        merged_df = pd.DataFrame()
        for symbol in symbols:
            req_params = {
                "symbol": symbol,
                "interval": interval,
                "limit": limit,
            }

            df = pd.DataFrame(
                requests.get(self.url, params=req_params).json(),
                index=range(limit),
            )

            if df.empty:
                return None

            df = df.iloc[:, 0:6]
            df.columns = ["datetime", "open", "high", "low", "close", "volume"]

            df[["open", "high", "low", "close", "volume"]] = df[
                ["open", "high", "low", "close", "volume"]
            ].astype(float)

            # No stock split and dividend announcement, hence adjusted close is the same as close
            df["adjusted_close"] = df["close"]
            df["datetime"] = df.datetime.apply(
                lambda x: dt.datetime.fromtimestamp(x / 1000.0)
            )
            df["tic"] = symbol
            df = df.rename(columns={"datetime": "time"})
            df.reset_index(drop=True, inplace=True)
            merged_df = merged_df.append(df)

        return merged_df

    def dataframe_with_limit(self, symbol):
        final_df = pd.DataFrame()
        last_datetime = self.start_time
        while True:
            new_df = self.get_binance_bars(last_datetime, symbol)
            if new_df is None:
                break

            if last_datetime == self.end_time:
                break

            final_df = pd.concat([final_df, new_df], axis=0, join="outer")
            # last_datetime = max(new_df.datetime) + dt.timedelta(days=1)
            last_datetime = max(new_df.datetime)
            if isinstance(last_datetime, pd.Timestamp):
                last_datetime = last_datetime.to_pydatetime()

            if self.time_diff == None:
                self.time_diff = new_df.loc[1]["datetime"] - new_df.loc[0]["datetime"]

            last_datetime = last_datetime + self.time_diff
            last_datetime = self.stringify_dates(last_datetime)

        date_value = final_df["datetime"].apply(
            lambda x: x.strftime("%Y-%m-%d %H:%M:%S")
        )
        final_df.insert(0, "time", date_value)
        final_df.drop("datetime", inplace=True, axis=1)
        return final_df

    def get_download_url(self, file_url):
        return f"{BINANCE_BASE_URL}{file_url}"

    # downloads zip, unzips zip and deltes zip
    def download_n_unzip_file(self, base_path, file_name, date_range=None):
        download_path = f"{base_path}{file_name}"
        if date_range:
            date_range = date_range.replace(" ", "_")
            base_path = os.path.join(base_path, date_range)

        # raw_cache_dir = get_destination_dir("./cache/tick_raw")
        raw_cache_dir = "./cache/tick_raw"
        zip_save_path = os.path.join(raw_cache_dir, file_name)

        csv_name = os.path.splitext(file_name)[0] + ".csv"
        csv_save_path = os.path.join(raw_cache_dir, csv_name)

        fhandles = []

        if os.path.exists(csv_save_path):
            print(f"\nfile already exists! {csv_save_path}")
            return [csv_save_path]

        # make the "cache" directory (only)
        if not os.path.exists(raw_cache_dir):
            Path(raw_cache_dir).mkdir(parents=True, exist_ok=True)

        try:
            download_url = self.get_download_url(download_path)
            dl_file = urllib.request.urlopen(download_url)
            length = dl_file.getheader("content-length")
            if length:
                length = int(length)
                blocksize = max(4096, length // 100)

            with open(zip_save_path, "wb") as out_file:
                dl_progress = 0
                print(f"\nFile Download: {zip_save_path}")
                while True:
                    buf = dl_file.read(blocksize)
                    if not buf:
                        break
                    out_file.write(buf)
                    # visuals
                    # dl_progress += len(buf)
                    # done = int(50 * dl_progress / length)
                    # sys.stdout.write("\r[%s%s]" % ('#' * done, '.' * (50-done)) )
                    # sys.stdout.flush()

            # unzip and delete zip
            file = zipfile.ZipFile(zip_save_path)
            with zipfile.ZipFile(zip_save_path) as zip:
                # guaranteed just 1 csv
                csvpath = zip.extract(zip.namelist()[0], raw_cache_dir)
                fhandles.append(csvpath)
            os.remove(zip_save_path)
            return fhandles

        except urllib.error.HTTPError:
            print(f"\nFile not found: {download_url}")

    def convert_to_date_object(self, d):
        year, month, day = [int(x) for x in d.split("-")]
        return date(year, month, day)

    def get_path(
        self,
        trading_type,
        market_data_type,
        time_period,
        symbol,
        interval=None,
    ):
        trading_type_path = "data/spot"
        # currently just supporting spot
        if trading_type != "spot":
            trading_type_path = f"data/futures/{trading_type}"
        return (
            f"{trading_type_path}/{time_period}/{market_data_type}/{symbol.upper()}/{interval}/"
            if interval is not None
            else f"{trading_type_path}/{time_period}/{market_data_type}/{symbol.upper()}/"
        )

    # helpers for manipulating tick level data (1s intervals)
    def download_daily_aggTrades(
        self, symbols, num_symbols, dates, start_date, end_date
    ):
        trading_type = "spot"
        date_range = start_date + " " + end_date
        start_date = self.convert_to_date_object(start_date)
        end_date = self.convert_to_date_object(end_date)

        print(f"Found {num_symbols} symbols")

        map = {}
        for current, symbol in enumerate(symbols):
            map[symbol] = []
            print(
                f"[{current + 1}/{num_symbols}] - start download daily {symbol} aggTrades "
            )
            for date in dates:
                current_date = self.convert_to_date_object(date)
                if current_date >= start_date and current_date <= end_date:
                    path = self.get_path(trading_type, "aggTrades", "daily", symbol)
                    file_name = f"{symbol.upper()}-aggTrades-{date}.zip"
                    fhandle = self.download_n_unzip_file(path, file_name, date_range)
                    map[symbol] += fhandle
        return map

    def fetch_aggTrades(self, startDate: str, endDate: str, tickers: List[str]):
        # all valid symbols traded on v3 api
        response = urllib.request.urlopen(
            "https://api.binance.com/api/v3/exchangeInfo"
        ).read()
        valid_symbols = list(
            map(
                lambda symbol: symbol["symbol"],
                json.loads(response)["symbols"],
            )
        )

        for tic in tickers:
            if tic not in valid_symbols:
                print(tic + " not a valid ticker, removing from download")
        tickers = list(set(tickers) & set(valid_symbols))
        num_symbols = len(tickers)
        # not adding tz yet
        # for ffill missing data on starting on first day 00:00:00 (if any)
        tminus1 = (self.convert_to_date_object(startDate) - dt.timedelta(1)).strftime(
            "%Y-%m-%d"
        )
        dates = pd.date_range(start=tminus1, end=endDate)
        dates = [date.strftime("%Y-%m-%d") for date in dates]
        return self.download_daily_aggTrades(
            tickers, num_symbols, dates, tminus1, endDate
        )

    # Dict[str]:List[str] -> pd.DataFrame
    def combine_raw(self, map):
        # same format as jingyang's current data format
        final_df = pd.DataFrame()
        # using AggTrades with headers from https://github.com/binance/binance-public-data/
        colNames = [
            "AggregatetradeId",
            "Price",
            "volume",
            "FirsttradeId",
            "LasttradeId",
            "time",
            "buyerWasMaker",
            "tradeWasBestPriceMatch",
        ]
        for tic in map.keys():
            security = pd.DataFrame()
            for i, csv in enumerate(map[tic]):
                dailyticks = pd.read_csv(
                    csv,
                    names=colNames,
                    index_col=["time"],
                    parse_dates=["time"],
                    date_parser=lambda epoch: pd.to_datetime(epoch, unit="ms"),
                )
                dailyfinal = dailyticks.resample("1s").agg(
                    {"Price": "ohlc", "volume": "sum"}
                )
                dailyfinal.columns = dailyfinal.columns.droplevel(0)
                # favor continuous series
                # dailyfinal.dropna(inplace=True)

                # implemented T-1 day ffill day start missing values
                # guaranteed first csv is tminus1 day
                if i == 0:
                    tmr = dailyfinal.index[0].date() + dt.timedelta(1)
                    tmr_dt = dt.datetime.combine(tmr, dt.time.min)
                    last_time_stamp_dt = dailyfinal.index[-1].to_pydatetime()
                    s_delta = (tmr_dt - last_time_stamp_dt).seconds
                    lastsample = dailyfinal.iloc[-1:]
                    lastsample.index = lastsample.index.shift(s_delta, "s")
                else:
                    day_dt = dailyfinal.index[0].date()
                    day_str = day_dt.strftime("%Y-%m-%d")
                    nextday_str = (day_dt + dt.timedelta(1)).strftime("%Y-%m-%d")
                    if dailyfinal.index[0].second != 0:
                        # append last sample
                        dailyfinal = lastsample.append(dailyfinal)
                    # otherwise, just reindex and ffill
                    dailyfinal = dailyfinal.reindex(
                        pd.date_range(day_str, nextday_str, freq="1s")[:-1],
                        method="ffill",
                    )
                    # save reference info (guaranteed to be :59)
                    lastsample = dailyfinal.iloc[-1:]
                    lastsample.index = lastsample.index.shift(1, "s")

                    if dailyfinal.shape[0] != 86400:
                        raise ValueError("everyday should have 86400 datapoints")

                    # only save real startDate - endDate
                    security = security.append(dailyfinal)

            security.ffill(inplace=True)
            security["tic"] = tic
            final_df = final_df.append(security)
        return final_df

    def fetch_n_combine(self, startDate, endDate, tickers):
        # return combine_raw(fetchAggTrades(startDate, endDate, tickers))
        mapping = self.fetch_aggTrades(startDate, endDate, tickers)
        return self.combine_raw(mapping)