File size: 5,476 Bytes
de6e775
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import calendar
from datetime import datetime
from typing import List

import ccxt
import numpy as np
import pandas as pd

from meta.data_processors._base import _Base

# from basic_processor import _Base


class Ccxt(_Base):
    def __init__(
        self,
        data_source: str,
        start_date: str,
        end_date: str,
        time_interval: str,
        **kwargs,
    ):
        super().__init__(data_source, start_date, end_date, time_interval, **kwargs)
        self.binance = ccxt.binance()

    def download_data(
        self, ticker_list: List[str], save_path: str = "./data/dataset.csv"
    ):
        crypto_column = pd.MultiIndex.from_product(
            [ticker_list, ["open", "high", "low", "close", "volume"]]
        )
        first_time = True
        for ticker in ticker_list:
            start_dt = datetime.strptime(self.start_date, "%Y%m%d %H:%M:%S")
            end_dt = datetime.strptime(self.end_date, "%Y%m%d %H:%M:%S")
            start_timestamp = calendar.timegm(start_dt.utctimetuple())
            end_timestamp = calendar.timegm(end_dt.utctimetuple())
            if self.time_interval == "1Min":
                date_list = [
                    datetime.utcfromtimestamp(float(time))
                    for time in range(start_timestamp, end_timestamp, 60 * 720)
                ]
            else:
                date_list = [
                    datetime.utcfromtimestamp(float(time))
                    for time in range(start_timestamp, end_timestamp, 60 * 1440)
                ]
            df = self.ohlcv(date_list, ticker, self.time_interval)
            if first_time:
                dataset = pd.DataFrame(columns=crypto_column, index=df["time"].values)
                first_time = False
            temp_col = pd.MultiIndex.from_product(
                [[ticker], ["open", "high", "low", "close", "volume"]]
            )
            dataset[temp_col] = df[["open", "high", "low", "close", "volume"]].values
        print("Actual end time: " + str(df["time"].values[-1]))
        self.dataframe = dataset

        self.save_data(save_path)

        print(
            f"Download complete! Dataset saved to {save_path}. \nShape of DataFrame: {self.dataframe.shape}"
        )

    # def add_technical_indicators(self, df, pair_list, tech_indicator_list = [
    #     'macd', 'boll_ub', 'boll_lb', 'rsi_30', 'dx_30',
    #     'close_30_sma', 'close_60_sma']):
    #     df = df.dropna()
    #     df = df.copy()
    #     column_list = [pair_list, ['open','high','low','close','volume']+(tech_indicator_list)]
    #     column = pd.MultiIndex.from_product(column_list)
    #     index_list = df.index
    #     dataset = pd.DataFrame(columns=column,index=index_list)
    #     for pair in pair_list:
    #         pair_column = pd.MultiIndex.from_product([[pair],['open','high','low','close','volume']])
    #         dataset[pair_column] = df[pair]
    #         temp_df = df[pair].reset_index().sort_values(by=['index'])
    #         temp_df = temp_df.rename(columns={'index':'date'})
    #         crypto_df = Sdf.retype(temp_df.copy())
    #         for indicator in tech_indicator_list:
    #             temp_indicator = crypto_df[indicator].values.tolist()
    #             dataset[(pair,indicator)] = temp_indicator
    #     print('Succesfully add technical indicators')
    #     return dataset

    def df_to_ary(self, pair_list, tech_indicator_list=None):
        if tech_indicator_list is None:
            tech_indicator_list = [
                "macd",
                "boll_ub",
                "boll_lb",
                "rsi_30",
                "dx_30",
                "close_30_sma",
                "close_60_sma",
            ]
        df = self.dataframe
        df = df.dropna()
        date_ary = df.index.values
        price_array = df[pd.MultiIndex.from_product([pair_list, ["close"]])].values
        tech_array = df[
            pd.MultiIndex.from_product([pair_list, tech_indicator_list])
        ].values
        return price_array, tech_array, date_ary

    def min_ohlcv(self, dt, pair, limit):
        since = calendar.timegm(dt.utctimetuple()) * 1000
        return self.binance.fetch_ohlcv(
            symbol=pair, timeframe="1m", since=since, limit=limit
        )

    def ohlcv(self, dt, pair, period="1d"):
        ohlcv = []
        limit = 1000
        if period == "1Min":
            limit = 720
        elif period == "1D":
            limit = 1
        elif period == "1H":
            limit = 24
        elif period == "5Min":
            limit = 288
        for i in dt:
            start_dt = i
            since = calendar.timegm(start_dt.utctimetuple()) * 1000
            if period == "1Min":
                ohlcv.extend(self.min_ohlcv(start_dt, pair, limit))
            else:
                ohlcv.extend(
                    self.binance.fetch_ohlcv(
                        symbol=pair, timeframe=period, since=since, limit=limit
                    )
                )
        df = pd.DataFrame(
            ohlcv, columns=["time", "open", "high", "low", "close", "volume"]
        )
        df["time"] = [datetime.fromtimestamp(float(time) / 1000) for time in df["time"]]
        df["open"] = df["open"].astype(np.float64)
        df["high"] = df["high"].astype(np.float64)
        df["low"] = df["low"].astype(np.float64)
        df["close"] = df["close"].astype(np.float64)
        df["volume"] = df["volume"].astype(np.float64)
        return df